aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdev/audit-release/audit_release.py2
-rwxr-xr-xdev/run-tests.py1
-rw-r--r--dev/sparktestsupport/modules.py34
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java75
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala70
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala67
-rw-r--r--external/flume-assembly/pom.xml168
-rw-r--r--external/flume-sink/pom.xml129
-rw-r--r--external/flume-sink/src/main/avro/sparkflume.avdl40
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala127
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala166
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala171
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala35
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala28
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala252
-rw-r--r--external/flume-sink/src/test/resources/log4j.properties28
-rw-r--r--external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala218
-rw-r--r--external/flume/pom.xml78
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala166
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala205
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala123
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala117
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala311
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java21
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala130
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala103
-rw-r--r--pom.xml48
-rw-r--r--project/SparkBuild.scala22
-rw-r--r--python/pyspark/streaming/flume.py140
-rw-r--r--python/pyspark/streaming/tests.py168
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java33
40 files changed, 3765 insertions, 20 deletions
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 426b3117f1..ee72da4df0 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -116,7 +116,7 @@ original_dir = os.getcwd()
# dependencies within those projects.
modules = [
"spark-core", "spark-mllib", "spark-streaming", "spark-repl",
- "spark-graphx", "spark-streaming-kafka",
+ "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
]
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
diff --git a/dev/run-tests.py b/dev/run-tests.py
index d940cdad3e..c2944747ee 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -337,6 +337,7 @@ def build_spark_sbt(hadoop_version):
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"streaming-kafka-assembly/assembly",
+ "streaming-flume-assembly/assembly",
"streaming-kinesis-asl-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index d1184886e2..bb04ec6ee6 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -223,6 +223,39 @@ streaming_kafka = Module(
)
+streaming_flume_sink = Module(
+ name="streaming-flume-sink",
+ dependencies=[streaming],
+ source_file_regexes=[
+ "external/flume-sink",
+ ],
+ sbt_test_goals=[
+ "streaming-flume-sink/test",
+ ]
+)
+
+
+streaming_flume = Module(
+ name="streaming-flume",
+ dependencies=[streaming],
+ source_file_regexes=[
+ "external/flume",
+ ],
+ sbt_test_goals=[
+ "streaming-flume/test",
+ ]
+)
+
+
+streaming_flume_assembly = Module(
+ name="streaming-flume-assembly",
+ dependencies=[streaming_flume, streaming_flume_sink],
+ source_file_regexes=[
+ "external/flume-assembly",
+ ]
+)
+
+
mllib = Module(
name="mllib",
dependencies=[streaming, sql],
@@ -294,6 +327,7 @@ pyspark_streaming = Module(
pyspark_core,
streaming,
streaming_kafka,
+ streaming_flume_assembly,
streaming_kinesis_asl
],
source_file_regexes=[
diff --git a/examples/pom.xml b/examples/pom.xml
index 1aa730c0dc..b7f37978b9 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -67,6 +67,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
new file mode 100644
index 0000000000..da56637fe8
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: JavaFlumeEventCount <host> <port>
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
+ */
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 2) {
+ System.err.println("Usage: JavaFlumeEventCount <host> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+
+ Duration batchInterval = new Duration(2000);
+ SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
+ JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
+
+ flumeStream.count();
+
+ flumeStream.count().map(new Function<Long, String>() {
+ @Override
+ public String call(Long in) {
+ return "Received " + in + " flume events.";
+ }
+ }).print();
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
new file mode 100644
index 0000000000..91e52e4eff
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <host> <port>
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
+ */
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: FlumeEventCount <host> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+
+ // Create the context and set the batch size
+ val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+ val ssc = new StreamingContext(sparkConf, batchInterval)
+
+ // Create a flume stream
+ val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
new file mode 100644
index 0000000000..dd725d72c2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with the Spark Sink running in a Flume agent. See
+ * the Spark Streaming programming guide for more details.
+ *
+ * Usage: FlumePollingEventCount <host> <port>
+ * `host` is the host on which the Spark Sink is running.
+ * `port` is the port at which the Spark Sink is listening.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
+ */
+object FlumePollingEventCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: FlumePollingEventCount <host> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+
+ // Create the context and set the batch size
+ val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
+ val ssc = new StreamingContext(sparkConf, batchInterval)
+
+ // Create a flume stream that polls the Spark Sink running in a Flume agent
+ val stream = FlumeUtils.createPollingStream(ssc, host, port)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
new file mode 100644
index 0000000000..ac15b93c04
--- /dev/null
+++ b/external/flume-assembly/pom.xml
@@ -0,0 +1,168 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-assembly_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <hadoop.deps.scope>provided</hadoop.deps.scope>
+ <sbt.project.name>streaming-flume-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!--
+ Demote already included in the Spark assembly. These are transitive dependencies of flume
+ or spark-streaming-flume, and this need to be explicitly included even through the parent
+ pom may declare them with ${hadoop.deps.scope}.
+ -->
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>${avro.mapred.classifier}</classifier>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>flume-provided</id>
+ <properties>
+ <flume.deps.scope>provided</flume.deps.scope>
+ </properties>
+ </profile>
+ </profiles>
+</project>
+
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000000..e4effe158c
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-flume-sink</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume Sink</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <exclusions>
+ <!-- Guava is excluded to avoid its use in this module. -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <!--
+ Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
+ dependency.
+ -->
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Add Guava in test scope since flume actually needs it. -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <!--
+ Netty explicitly added in test as it has been excluded from
+ Flume dependency (to avoid runtime problems when running with
+ Spark) but unit tests need it. Version of Netty on which
+ Flume 1.4.0 depends on is "3.4.0.Final" .
+ -->
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.4.0.Final</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <configuration>
+ <!-- Generate the output in the same directory as the sbt-avro-plugin -->
+ <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <!-- Disable all relocations defined in the parent pom. -->
+ <relocations combine.self="override" />
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000000..8806e863ac
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.streaming.flume.sink")
+
+protocol SparkFlumeProtocol {
+
+ record SparkSinkEvent {
+ map<string> headers;
+ bytes body;
+ }
+
+ record EventBatch {
+ string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
+ string sequenceNumber;
+ array<SparkSinkEvent> events;
+ }
+
+ EventBatch getEventBatch (int n);
+
+ void ack (string sequenceNumber);
+
+ void nack (string sequenceNumber);
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
new file mode 100644
index 0000000000..09d3fe91e4
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
+ * The org.apache.spark.Logging is not used so that all of Spark is not brought
+ * in as a dependency.
+ */
+private[sink] trait Logging {
+ // Make the log field transient so that objects with Logging can
+ // be serialized and used on another machine
+ @transient private var _log: Logger = null
+
+ // Method to get or create the logger for this object
+ protected def log: Logger = {
+ if (_log == null) {
+ initializeIfNecessary()
+ var className = this.getClass.getName
+ // Ignore trailing $'s in the class names for Scala objects
+ if (className.endsWith("$")) {
+ className = className.substring(0, className.length - 1)
+ }
+ _log = LoggerFactory.getLogger(className)
+ }
+ _log
+ }
+
+ // Log methods that take only a String
+ protected def logInfo(msg: => String) {
+ if (log.isInfoEnabled) log.info(msg)
+ }
+
+ protected def logDebug(msg: => String) {
+ if (log.isDebugEnabled) log.debug(msg)
+ }
+
+ protected def logTrace(msg: => String) {
+ if (log.isTraceEnabled) log.trace(msg)
+ }
+
+ protected def logWarning(msg: => String) {
+ if (log.isWarnEnabled) log.warn(msg)
+ }
+
+ protected def logError(msg: => String) {
+ if (log.isErrorEnabled) log.error(msg)
+ }
+
+ // Log methods that take Throwables (Exceptions/Errors) too
+ protected def logInfo(msg: => String, throwable: Throwable) {
+ if (log.isInfoEnabled) log.info(msg, throwable)
+ }
+
+ protected def logDebug(msg: => String, throwable: Throwable) {
+ if (log.isDebugEnabled) log.debug(msg, throwable)
+ }
+
+ protected def logTrace(msg: => String, throwable: Throwable) {
+ if (log.isTraceEnabled) log.trace(msg, throwable)
+ }
+
+ protected def logWarning(msg: => String, throwable: Throwable) {
+ if (log.isWarnEnabled) log.warn(msg, throwable)
+ }
+
+ protected def logError(msg: => String, throwable: Throwable) {
+ if (log.isErrorEnabled) log.error(msg, throwable)
+ }
+
+ protected def isTraceEnabled(): Boolean = {
+ log.isTraceEnabled
+ }
+
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+private[sink] object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
+ try {
+ // We use reflection here to handle the case where users remove the
+ // slf4j-to-jul bridge order to route their logs to JUL.
+ // scalastyle:off classforname
+ val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+ // scalastyle:on classforname
+ bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
+ val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
+ if (!installed) {
+ bridgeClass.getMethod("install").invoke(null)
+ }
+ } catch {
+ case e: ClassNotFoundException => // can't log anything yet so just fail silently
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
new file mode 100644
index 0000000000..719fca0938
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.flume.Channel
+
+/**
+ * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
+ * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
+ * @param threads Number of threads to use to process requests.
+ * @param channel The channel that the sink pulls events from
+ * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
+ * is rolled back.
+ */
+// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
+// rolled back from the thread it was originally created in. So each getEvents call from Spark
+// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
+// and events are pulled off the channel. Once the events are sent to spark,
+// that thread is blocked and the TransactionProcessor is saved in a map,
+// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
+// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
+// unblocked, at which point the transaction is committed or rolled back.
+
+private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
+ val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
+ val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
+ new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
+ // Protected by `sequenceNumberToProcessor`
+ private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
+ // This sink will not persist sequence numbers and reuses them if it gets restarted.
+ // So it is possible to commit a transaction which may have been meant for the sink before the
+ // restart.
+ // Since the new txn may not have the same sequence number we must guard against accidentally
+ // committing a new transaction. To reduce the probability of that happening a random string is
+ // prepended to the sequence number. Does not change for life of sink
+ private val seqBase = UUID.randomUUID().toString.substring(0, 8)
+ private val seqCounter = new AtomicLong(0)
+
+ // Protected by `sequenceNumberToProcessor`
+ private var stopped = false
+
+ @volatile private var isTest = false
+ private var testLatch: CountDownLatch = null
+
+ /**
+ * Returns a bunch of events to Spark over Avro RPC.
+ * @param n Maximum number of events to return in a batch
+ * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
+ */
+ override def getEventBatch(n: Int): EventBatch = {
+ logDebug("Got getEventBatch call from Spark.")
+ val sequenceNumber = seqBase + seqCounter.incrementAndGet()
+ createProcessor(sequenceNumber, n) match {
+ case Some(processor) =>
+ transactionExecutorOpt.foreach(_.submit(processor))
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (SparkSinkUtils.isErrorBatch(batch)) {
+ // Remove the processor if it is an error batch since no ACK is sent.
+ removeAndGetProcessor(sequenceNumber)
+ logWarning("Received an error batch - no events were received from channel! ")
+ }
+ batch
+ case None =>
+ new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+ }
+ }
+
+ private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ if (!stopped) {
+ val processor = new TransactionProcessor(
+ channel, seq, n, transactionTimeout, backOffInterval, this)
+ sequenceNumberToProcessor.put(seq, processor)
+ if (isTest) {
+ processor.countDownWhenBatchAcked(testLatch)
+ }
+ Some(processor)
+ } else {
+ None
+ }
+ }
+ }
+
+ /**
+ * Called by Spark to indicate successful commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that was successful
+ */
+ override def ack(sequenceNumber: CharSequence): Void = {
+ logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
+ completeTransaction(sequenceNumber, success = true)
+ null
+ }
+
+ /**
+ * Called by Spark to indicate failed commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that failed
+ * @return
+ */
+ override def nack(sequenceNumber: CharSequence): Void = {
+ completeTransaction(sequenceNumber, success = false)
+ logInfo("Spark failed to commit transaction. Will reattempt events.")
+ null
+ }
+
+ /**
+ * Helper method to commit or rollback a transaction.
+ * @param sequenceNumber The sequence number of the batch that was completed
+ * @param success Whether the batch was successful or not.
+ */
+ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+ removeAndGetProcessor(sequenceNumber).foreach(processor => {
+ processor.batchProcessed(success)
+ })
+ }
+
+ /**
+ * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
+ * @param sequenceNumber
+ * @return An `Option` of the transaction processor for the corresponding batch. Note that this
+ * instance is no longer tracked and the caller is responsible for that txn processor.
+ */
+ private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+ Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ sequenceNumberToProcessor.remove(sequenceNumber.toString)
+ }
+ }
+
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
+
+ /**
+ * Shuts down the executor used to process transactions.
+ */
+ def shutdown() {
+ logInfo("Shutting down Spark Avro Callback Handler")
+ sequenceNumberToProcessor.synchronized {
+ stopped = true
+ sequenceNumberToProcessor.values.foreach(_.shutdown())
+ }
+ transactionExecutorOpt.foreach(_.shutdownNow())
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
new file mode 100644
index 0000000000..14dffb15fe
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.Context
+import org.apache.flume.Sink.Status
+import org.apache.flume.conf.{Configurable, ConfigurationException}
+import org.apache.flume.sink.AbstractSink
+
+/**
+ * A sink that uses Avro RPC to run a server that can be polled by Spark's
+ * FlumePollingInputDStream. This sink has the following configuration parameters:
+ *
+ * hostname - The hostname to bind to. Default: 0.0.0.0
+ * port - The port to bind to. (No default - mandatory)
+ * timeout - Time in seconds after which a transaction is rolled back,
+ * if an ACK is not received from Spark within that time
+ * threads - Number of threads to use to receive requests from Spark (Default: 10)
+ *
+ * This sink is unlike other Flume sinks in the sense that it does not push data,
+ * instead the process method in this sink simply blocks the SinkRunner the first time it is
+ * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
+ *
+ * Each time a getEventBatch call comes, creates a transaction and reads events
+ * from the channel. When enough events are read, the events are sent to the Spark receiver and
+ * the thread itself is blocked and a reference to it saved off.
+ *
+ * When the ack for that batch is received,
+ * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * channel from the same thread it was originally created in (since Flume transactions are
+ * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
+ * is received within the specified timeout, the transaction is rolled back too. If an ack comes
+ * after that, it is simply ignored and the events get re-sent.
+ *
+ */
+
+class SparkSink extends AbstractSink with Logging with Configurable {
+
+ // Size of the pool to use for holding transaction processors.
+ private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
+
+ // Timeout for each transaction. If spark does not respond in this much time,
+ // rollback the transaction
+ private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+ // Address info to bind on
+ private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+ private var port: Int = 0
+
+ private var backOffInterval: Int = 200
+
+ // Handle to the server
+ private var serverOpt: Option[NettyServer] = None
+
+ // The handler that handles the callback from Avro
+ private var handler: Option[SparkAvroCallbackHandler] = None
+
+ // Latch that blocks off the Flume framework from wasting 1 thread.
+ private val blockingLatch = new CountDownLatch(1)
+
+ override def start() {
+ logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
+ hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
+ transactionTimeout + ".")
+ handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
+ backOffInterval))
+ val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
+ // Using the constructor that takes specific thread-pools requires bringing in netty
+ // dependencies which are being excluded in the build. In practice,
+ // Netty dependencies are already available on the JVM as Flume would have pulled them in.
+ serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
+ serverOpt.foreach(server => {
+ logInfo("Starting Avro server for sink: " + getName)
+ server.start()
+ })
+ super.start()
+ }
+
+ override def stop() {
+ logInfo("Stopping Spark Sink: " + getName)
+ handler.foreach(callbackHandler => {
+ callbackHandler.shutdown()
+ })
+ serverOpt.foreach(server => {
+ logInfo("Stopping Avro Server for sink: " + getName)
+ server.close()
+ server.join()
+ })
+ blockingLatch.countDown()
+ super.stop()
+ }
+
+ override def configure(ctx: Context) {
+ import SparkSinkConfig._
+ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+ port = Option(ctx.getInteger(CONF_PORT)).
+ getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
+ poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
+ transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+ backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
+ logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
+ "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
+ "backoffInterval: " + backOffInterval)
+ }
+
+ override def process(): Status = {
+ // This method is called in a loop by the Flume framework - block it until the sink is
+ // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
+ // being shut down.
+ logInfo("Blocking Sink Runner, sink will continue to run..")
+ blockingLatch.await()
+ Status.BACKOFF
+ }
+
+ private[flume] def getPort(): Int = {
+ serverOpt
+ .map(_.getPort)
+ .getOrElse(
+ throw new RuntimeException("Server was not started!")
+ )
+ }
+
+ /**
+ * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+ * batch is received. The test can simply call await on this latch till the expected number of
+ * batches are received.
+ * @param latch
+ */
+ private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+ handler.foreach(_.countDownWhenBatchAcked(latch))
+ }
+}
+
+/**
+ * Configuration parameters and their defaults.
+ */
+private[flume]
+object SparkSinkConfig {
+ val THREADS = "threads"
+ val DEFAULT_THREADS = 10
+
+ val CONF_TRANSACTION_TIMEOUT = "timeout"
+ val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+ val CONF_HOSTNAME = "hostname"
+ val DEFAULT_HOSTNAME = "0.0.0.0"
+
+ val CONF_PORT = "port"
+
+ val CONF_BACKOFF_INTERVAL = "backoffInterval"
+ val DEFAULT_BACKOFF_INTERVAL = 200
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
new file mode 100644
index 0000000000..845fc8debd
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Thread factory that generates daemon threads with a specified name format.
+ */
+private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
+
+ private val threadId = new AtomicLong()
+
+ override def newThread(r: Runnable): Thread = {
+ val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
+ t.setDaemon(true)
+ t
+ }
+
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
new file mode 100644
index 0000000000..47c0e294d6
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+private[flume] object SparkSinkUtils {
+ /**
+ * This method determines if this batch represents an error or not.
+ * @param batch - The batch to check
+ * @return - true if the batch represents an error
+ */
+ def isErrorBatch(batch: EventBatch): Boolean = {
+ !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
new file mode 100644
index 0000000000..b15c2097e5
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
+
+import scala.util.control.Breaks
+
+import org.apache.flume.{Channel, Transaction}
+
+// Flume forces transactions to be thread-local (horrible, I know!)
+// So the sink basically spawns a new thread to pull the events out within a transaction.
+// The thread fills in the event batch object that is set before the thread is scheduled.
+// After filling it in, the thread waits on a condition - which is released only
+// when the success message comes back for the specific sequence number for that event batch.
+/**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. The thread is blocked until the success call for that transaction
+ * comes back with an ACK or NACK.
+ * @param channel The channel from which to pull events
+ * @param seqNum The sequence number to use for the transaction. Must be unique
+ * @param maxBatchSize The maximum number of events to process per batch
+ * @param transactionTimeout Time in seconds after which a transaction must be rolled back
+ * without waiting for an ACK from Spark
+ * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
+ */
+private class TransactionProcessor(val channel: Channel, val seqNum: String,
+ var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
+ val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
+
+ // If a real batch is not returned, we always have to return an error batch.
+ @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
+ util.Collections.emptyList())
+
+ // Synchronization primitives
+ val batchGeneratedLatch = new CountDownLatch(1)
+ val batchAckLatch = new CountDownLatch(1)
+
+ // Sanity check to ensure we don't loop like crazy
+ val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
+
+ // OK to use volatile, since the change would only make this true (otherwise it will be
+ // changed to false - we never apply a negation operation to this) - which means the transaction
+ // succeeded.
+ @volatile private var batchSuccess = false
+
+ @volatile private var stopped = false
+
+ @volatile private var isTest = false
+
+ private var testLatch: CountDownLatch = null
+
+ // The transaction that this processor would handle
+ var txOpt: Option[Transaction] = None
+
+ /**
+ * Get an event batch from the channel. This method will block until a batch of events is
+ * available from the channel. If no events are available after a large number of attempts of
+ * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
+ *
+ * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
+ * maximum of maxBatchSize events
+ */
+ def getEventBatch: EventBatch = {
+ batchGeneratedLatch.await()
+ eventBatch
+ }
+
+ /**
+ * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
+ * method is a no-op if it is called after transactionTimeout has expired since
+ * getEventBatch returned a batch of events.
+ * @param success True if an ACK was received and the transaction should be committed, else false.
+ */
+ def batchProcessed(success: Boolean) {
+ logDebug("Batch processed for sequence number: " + seqNum)
+ batchSuccess = success
+ batchAckLatch.countDown()
+ }
+
+ private[flume] def shutdown(): Unit = {
+ logDebug("Shutting down transaction processor")
+ stopped = true
+ }
+
+ /**
+ * Populates events into the event batch. If the batch cannot be populated,
+ * this method will not set the events into the event batch, but it sets an error message.
+ */
+ private def populateEvents() {
+ try {
+ txOpt = Option(channel.getTransaction)
+ if(txOpt.isEmpty) {
+ eventBatch.setErrorMsg("Something went wrong. Channel was " +
+ "unable to create a transaction!")
+ }
+ txOpt.foreach(tx => {
+ tx.begin()
+ val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
+ val loop = new Breaks
+ var gotEventsInThisTxn = false
+ var loopCounter: Int = 0
+ loop.breakable {
+ while (!stopped && events.size() < maxBatchSize
+ && loopCounter < totalAttemptsToRemoveFromChannel) {
+ loopCounter += 1
+ Option(channel.take()) match {
+ case Some(event) =>
+ events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
+ ByteBuffer.wrap(event.getBody)))
+ gotEventsInThisTxn = true
+ case None =>
+ if (!gotEventsInThisTxn && !stopped) {
+ logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
+ " the current transaction")
+ TimeUnit.MILLISECONDS.sleep(backOffInterval)
+ } else {
+ loop.break()
+ }
+ }
+ }
+ }
+ if (!gotEventsInThisTxn && !stopped) {
+ val msg = "Tried several times, " +
+ "but did not get any events from the channel!"
+ logWarning(msg)
+ eventBatch.setErrorMsg(msg)
+ } else {
+ // At this point, the events are available, so fill them into the event batch
+ eventBatch = new EventBatch("", seqNum, events)
+ }
+ })
+ } catch {
+ case interrupted: InterruptedException =>
+ // Don't pollute logs if the InterruptedException came from this being stopped
+ if (!stopped) {
+ logWarning("Error while processing transaction.", interrupted)
+ }
+ case e: Exception =>
+ logWarning("Error while processing transaction.", e)
+ eventBatch.setErrorMsg(e.getMessage)
+ try {
+ txOpt.foreach(tx => {
+ rollbackAndClose(tx, close = true)
+ })
+ } finally {
+ txOpt = None
+ }
+ } finally {
+ batchGeneratedLatch.countDown()
+ }
+ }
+
+ /**
+ * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
+ * this method commits the transaction with the channel. If the ACK does not come in within
+ * that time or a NACK comes in, this method rolls back the transaction.
+ */
+ private def processAckOrNack() {
+ batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
+ txOpt.foreach(tx => {
+ if (batchSuccess) {
+ try {
+ logDebug("Committing transaction")
+ tx.commit()
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
+ "back", e)
+ rollbackAndClose(tx, close = false) // tx will be closed later anyway
+ } finally {
+ tx.close()
+ if (isTest) {
+ testLatch.countDown()
+ }
+ }
+ } else {
+ logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
+ rollbackAndClose(tx, close = true)
+ // This might have been due to timeout or a NACK. Either way the following call does not
+ // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
+ parent.removeAndGetProcessor(seqNum)
+ }
+ })
+ }
+
+ /**
+ * Helper method to rollback and optionally close a transaction
+ * @param tx The transaction to rollback
+ * @param close Whether the transaction should be closed or not after rolling back
+ */
+ private def rollbackAndClose(tx: Transaction, close: Boolean) {
+ try {
+ logWarning("Spark was unable to successfully process the events. Transaction is being " +
+ "rolled back.")
+ tx.rollback()
+ } catch {
+ case e: Exception =>
+ logError("Error rolling back transaction. Rollback may have failed!", e)
+ } finally {
+ if (close) {
+ tx.close()
+ }
+ }
+ }
+
+ /**
+ * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
+ * @param inMap The map to be converted
+ * @return The converted map
+ */
+ private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+ CharSequence] = {
+ val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+ charSeqMap.putAll(inMap)
+ charSeqMap
+ }
+
+ /**
+ * When the thread is started it sets as many events as the batch size or less (if enough
+ * events aren't available) into the eventBatch and object and lets any threads waiting on the
+ * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
+ * or for a specified timeout and commits or rolls back the transaction.
+ * @return
+ */
+ override def call(): Void = {
+ populateEvents()
+ processAckOrNack()
+ null
+ }
+
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
+}
diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..42df8792f1
--- /dev/null
+++ b/external/flume-sink/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
new file mode 100644
index 0000000000..e8ca1e7163
--- /dev/null
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.event.EventBuilder
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+// Due to MNG-1378, there is not a way to include test dependencies transitively.
+// We cannot include Spark core tests as a dependency here because it depends on
+// Spark core main, which has too many dependencies to require here manually.
+// For this reason, we continue to use FunSuite and ignore the scalastyle checks
+// that fail if this is detected.
+// scalastyle:off
+import org.scalatest.FunSuite
+
+class SparkSinkSuite extends FunSuite {
+// scalastyle:on
+
+ val eventsPerBatch = 1000
+ val channelCapacity = 5000
+
+ test("Success with ack") {
+ val (channel, sink, latch) = initializeChannelAndSink()
+ channel.start()
+ sink.start()
+
+ putEvents(channel, eventsPerBatch)
+
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ client.ack(events.getSequenceNumber)
+ assert(events.getEvents.size() === 1000)
+ latch.await(1, TimeUnit.SECONDS)
+ assertChannelIsEmpty(channel)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Failure with nack") {
+ val (channel, sink, latch) = initializeChannelAndSink()
+ channel.start()
+ sink.start()
+ putEvents(channel, eventsPerBatch)
+
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ assert(events.getEvents.size() === 1000)
+ client.nack(events.getSequenceNumber)
+ latch.await(1, TimeUnit.SECONDS)
+ assert(availableChannelSlots(channel) === 4000)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Failure with timeout") {
+ val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
+ .CONF_TRANSACTION_TIMEOUT -> 1.toString))
+ channel.start()
+ sink.start()
+ putEvents(channel, eventsPerBatch)
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ assert(events.getEvents.size() === 1000)
+ latch.await(1, TimeUnit.SECONDS)
+ assert(availableChannelSlots(channel) === 4000)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Multiple consumers") {
+ testMultipleConsumers(failSome = false)
+ }
+
+ test("Multiple consumers with some failures") {
+ testMultipleConsumers(failSome = true)
+ }
+
+ def testMultipleConsumers(failSome: Boolean): Unit = {
+ implicit val executorContext = ExecutionContext
+ .fromExecutorService(Executors.newFixedThreadPool(5))
+ val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
+ channel.start()
+ sink.start()
+ (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+ val transceiversAndClients = getTransceiverAndClient(address, 5)
+ val batchCounter = new CountDownLatch(5)
+ val counter = new AtomicInteger(0)
+ transceiversAndClients.foreach(x => {
+ Future {
+ val client = x._2
+ val events = client.getEventBatch(1000)
+ if (!failSome || counter.getAndIncrement() % 2 == 0) {
+ client.ack(events.getSequenceNumber)
+ } else {
+ client.nack(events.getSequenceNumber)
+ throw new RuntimeException("Sending NACK for failure!")
+ }
+ events
+ }.onComplete {
+ case Success(events) =>
+ assert(events.getEvents.size() === 1000)
+ batchCounter.countDown()
+ case Failure(t) =>
+ // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
+ batchCounter.countDown()
+ }
+ })
+ batchCounter.await()
+ latch.await(1, TimeUnit.SECONDS)
+ executorContext.shutdown()
+ if(failSome) {
+ assert(availableChannelSlots(channel) === 3000)
+ } else {
+ assertChannelIsEmpty(channel)
+ }
+ sink.stop()
+ channel.stop()
+ transceiversAndClients.foreach(x => x._1.close())
+ }
+
+ private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
+ batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
+ val channel = new MemoryChannel()
+ val channelContext = new Context()
+
+ channelContext.put("capacity", channelCapacity.toString)
+ channelContext.put("transactionCapacity", 1000.toString)
+ channelContext.put("keep-alive", 0.toString)
+ channelContext.putAll(overrides.asJava)
+ channel.setName(scala.util.Random.nextString(10))
+ channel.configure(channelContext)
+
+ val sink = new SparkSink()
+ val sinkContext = new Context()
+ sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
+ sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
+ sink.configure(sinkContext)
+ sink.setChannel(channel)
+ val latch = new CountDownLatch(batchCounter)
+ sink.countdownWhenBatchReceived(latch)
+ (channel, sink, latch)
+ }
+
+ private def putEvents(ch: MemoryChannel, count: Int): Unit = {
+ val tx = ch.getTransaction
+ tx.begin()
+ (1 to count).foreach(x =>
+ ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
+ tx.commit()
+ tx.close()
+ }
+
+ private def getTransceiverAndClient(address: InetSocketAddress,
+ count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
+
+ (1 to count).map(_ => {
+ lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
+ new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+ val transceiver = new NettyTransceiver(address, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ (transceiver, client)
+ })
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ assert(availableChannelSlots(channel) === channelCapacity)
+ }
+
+ private def availableChannelSlots(channel: MemoryChannel): Int = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
+ }
+}
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000000..d650dd034d
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-flume</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000..07c5286477
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectInput, ObjectOutput}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- headers.asScala) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000000..5f234b1f0c
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
+ Logging {
+
+ def run(): Unit = {
+ while (!receiver.isStopped()) {
+ val connection = receiver.getConnections.poll()
+ val client = connection.client
+ var batchReceived = false
+ var seq: CharSequence = null
+ try {
+ getBatch(client) match {
+ case Some(eventBatch) =>
+ batchReceived = true
+ seq = eventBatch.getSequenceNumber
+ val events = toSparkFlumeEvents(eventBatch.getEvents)
+ if (store(events)) {
+ sendAck(client, seq)
+ } else {
+ sendNack(batchReceived, client, seq)
+ }
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ Throwables.getRootCause(e) match {
+ // If the cause was an InterruptedException, then check if the receiver is stopped -
+ // if yes, just break out of the loop. Else send a Nack and log a warning.
+ // In the unlikely case, the cause was not an Exception,
+ // then just throw it out and exit.
+ case interrupted: InterruptedException =>
+ if (!receiver.isStopped()) {
+ logWarning("Interrupted while receiving data from Flume", interrupted)
+ sendNack(batchReceived, client, seq)
+ }
+ case exception: Exception =>
+ logWarning("Error while receiving data from Flume", exception)
+ sendNack(batchReceived, client, seq)
+ }
+ } finally {
+ receiver.getConnections.add(connection)
+ }
+ }
+ }
+
+ /**
+ * Gets a batch of events from the specified client. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client Client to get events from
+ * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
+ */
+ private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
+ val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
+ s"number: ${eventBatch.getSequenceNumber}")
+ Some(eventBatch)
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
+ eventBatch.getErrorMsg)
+ None
+ }
+ }
+
+ /**
+ * Store the events in the buffer to Spark. This method will not propagate any exceptions,
+ * but will propagate any other errors.
+ * @param buffer The buffer to store
+ * @return true if the data was stored without any exception being thrown, else false
+ */
+ private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+ try {
+ receiver.store(buffer)
+ true
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to store data received from Flume", e)
+ false
+ }
+ }
+
+ /**
+ * Send an ack to the client for the sequence number. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client client to send the ack to
+ * @param seq sequence number of the batch to be ack-ed.
+ * @return
+ */
+ private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
+ logDebug("Sending ack for sequence number: " + seq)
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ }
+
+ /**
+ * This method sends a Nack if a batch was received to the client with the given sequence
+ * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
+ * to handle it.
+ * @param batchReceived true if a batch was received. If this is false, no nack is sent
+ * @param client The client to which the nack should be sent
+ * @param seq The sequence number of the batch that is being nack-ed.
+ */
+ private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
+ seq: CharSequence): Unit = {
+ if (batchReceived) {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ }
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+ * @param events - Events to convert to SparkFlumeEvents
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+ ArrayBuffer[SparkFlumeEvent] = {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ val event = events.get(j)
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ buffer += sparkFlumeEvent
+ j += 1
+ }
+ buffer
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
new file mode 100644
index 0000000000..7dc9606913
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status}
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+private[streaming]
+class FlumeInputDStream[T: ClassTag](
+ _ssc: StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event: AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+ val body = event.getBody
+ out.writeInt(body.remaining())
+ Utils.writeByteBuffer(body, out)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders.asScala) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+private[streaming] object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event: AvroFlumeEvent): Status = {
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+
+ override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
+ events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface. */
+private[streaming]
+class FlumeReceiver(
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ var server: NettyServer = null
+
+ private def initServer() = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool())
+ val channelPipelineFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelineFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
+
+ def onStart() {
+ synchronized {
+ if (server == null) {
+ server = initServer()
+ server.start()
+ } else {
+ logWarning("Flume receiver being asked to start more then once with out close")
+ }
+ }
+ logInfo("Flume receiver started")
+ }
+
+ def onStop() {
+ synchronized {
+ if (server != null) {
+ server.close()
+ server = null
+ }
+ }
+ logInfo("Flume receiver stopped")
+ }
+
+ override def preferredLocation: Option[String] = Option(host)
+
+ /** A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
+ private[streaming]
+ class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+ def getPipeline(): ChannelPipeline = {
+ val pipeline = Channels.pipeline()
+ val encoder = new ZlibEncoder(6)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ pipeline
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000..250bfc1718
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+ _ssc: StreamingContext,
+ val addresses: Seq[InetSocketAddress],
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ addresses: Seq[InetSocketAddress],
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val channelFactoryExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+ setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+ override def onStart(): Unit = {
+ // Create the connections to each Flume agent.
+ addresses.foreach(host => {
+ val transceiver = new NettyTransceiver(host, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ connections.add(new FlumeConnection(transceiver, client))
+ })
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads..")
+ // Threads that pull data from Flume.
+ receiverExecutor.submit(new FlumeBatchFetcher(this))
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdown()
+ // Wait upto a minute for the threads to die
+ if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ receiverExecutor.shutdownNow()
+ }
+ connections.asScala.foreach(_.transceiver.close())
+ channelFactory.releaseExternalResources()
+ }
+
+ private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+ this.connections
+ }
+
+ private[flume] def getMaxBatchSize: Int = {
+ this.maxBatchSize
+ }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
+ val client: SparkFlumeProtocol.Callback)
+
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000000..945cfa7295
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList}
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+ private var transceiver: NettyTransceiver = null
+
+ private val testPort: Int = findFreePort()
+
+ def getTestPort(): Int = testPort
+
+ /** Find a free port */
+ private def findFreePort(): Int = {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ }, new SparkConf())._2
+ }
+
+ /** Send data to the flume receiver */
+ def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+ val testAddress = new InetSocketAddress("localhost", testPort)
+
+ val inputEvents = input.asScala.map { item =>
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
+ event.setHeaders(Collections.singletonMap("test", "header"))
+ event
+ }
+
+ // if last attempted transceiver had succeeded, close it
+ close()
+
+ // Create transceiver
+ transceiver = {
+ if (enableCompression) {
+ new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+ } else {
+ new NettyTransceiver(testAddress)
+ }
+ }
+
+ // Create Avro client with the transceiver
+ val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+ if (client == null) {
+ throw new AssertionError("Cannot create client")
+ }
+
+ // Send data
+ val status = client.appendBatch(inputEvents.asJava)
+ if (status != avro.Status.OK) {
+ throw new AssertionError("Sent events unsuccessfully")
+ }
+ }
+
+ def close(): Unit = {
+ if (transceiver != null) {
+ transceiver.close()
+ transceiver = null
+ }
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
new file mode 100644
index 0000000000..3e3ed712f0
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+import java.net.InetSocketAddress
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object FlumeUtils {
+ private val DEFAULT_POLLING_PARALLELISM = 5
+ private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+ ssc, hostname, port, storageLevel, enableDecompression)
+
+ inputStream
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Address of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ }
+}
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's FlumeUtils.
+ */
+private[flume] class FlumeUtilsPythonHelper {
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hosts: JList[String],
+ ports: JList[Int],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ assert(hosts.size() == ports.size())
+ val addresses = hosts.asScala.zip(ports.asScala).map {
+ case (host, port) => new InetSocketAddress(host, port)
+ }
+ val dstream = FlumeUtils.createPollingStream(
+ jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+ private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream()
+ val output = new DataOutputStream(byteStream)
+ try {
+ output.writeInt(map.size)
+ map.asScala.foreach { kv =>
+ PythonRDD.writeUTF(kv._1.toString, output)
+ PythonRDD.writeUTF(kv._2.toString, output)
+ }
+ byteStream.toByteArray
+ }
+ finally {
+ output.close()
+ }
+ }
+
+ private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
+ JavaPairDStream[Array[Byte], Array[Byte]] = {
+ dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
+ override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
+ val event = sparkEvent.event
+ val byteBuffer = event.getBody
+ val body = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(body)
+ (stringMapToByteArray(event.getHeaders), body)
+ }
+ })
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000000..1a96df6e94
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.nio.charset.StandardCharsets
+import java.util.{Collections, List => JList, Map => JMap}
+import java.util.concurrent._
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+ private val batchCount = 5
+ val eventsPerBatch = 100
+ private val totalEventsPerChannel = batchCount * eventsPerBatch
+ private val channelCapacity = 5000
+
+ def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+ private val channels = new ArrayBuffer[MemoryChannel]
+ private val sinks = new ArrayBuffer[SparkSink]
+
+ /**
+ * Start a sink and return the port of this sink
+ */
+ def startSingleSink(): Int = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ channels += (channel)
+ sinks += sink
+
+ sink.getPort()
+ }
+
+ /**
+ * Start 2 sinks and return the ports
+ */
+ def startMultipleSinks(): Seq[Int] = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+
+ sinks += sink
+ sinks += sink2
+ channels += channel
+ channels += channel2
+
+ sinks.map(_.getPort())
+ }
+
+ /**
+ * Send data and wait until all data has been received
+ */
+ def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+ val latch = new CountDownLatch(batchCount * channels.size)
+ sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+ channels.foreach(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel))
+ })
+
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+
+ latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+ }
+
+ /**
+ * A Python-friendly method to assert the output
+ */
+ def assertOutput(
+ outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+ require(outputHeaders.size == outputBodies.size)
+ val eventSize = outputHeaders.size
+ if (eventSize != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
+ }
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventBodyToVerify = s"${channels(k).getName}-$i"
+ val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
+ var found = false
+ var j = 0
+ while (j < eventSize && !found) {
+ if (eventBodyToVerify == outputBodies.get(j) &&
+ eventHeaderToVerify == outputHeaders.get(j)) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ if (counter != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
+ }
+ }
+
+ def assertChannelsAreEmpty(): Unit = {
+ channels.foreach(assertChannelIsEmpty)
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ throw new AssertionError(s"Channel ${channel.getName} is not empty")
+ }
+ }
+
+ def close(): Unit = {
+ sinks.foreach(_.stop())
+ sinks.clear()
+ channels.foreach(_.stop())
+ channels.clear()
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody(
+ s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
+ Collections.singletonMap(s"test-$t", "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ }
+ null
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
new file mode 100644
index 0000000000..d31aa5f5c0
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package org.apache.spark.streaming.flume; \ No newline at end of file
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
new file mode 100644
index 0000000000..9bfab68c4b
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package object flume
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000000..cfedb5a042
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000000..79c5b91654
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ InetSocketAddress[] addresses = new InetSocketAddress[] {
+ new InetSocketAddress("localhost", 12345)
+ };
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+ FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
+ ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000000..3b5e0c7746
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), false);
+ }
+}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..75e3b53a09
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
new file mode 100644
index 0000000000..c97a27ca7c
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
+ */
+class TestOutputStream[T: ClassTag](parent: DStream[T],
+ val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output.add(collected)
+ }, false) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000000..156712483d
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.util.{ManualClock, Utils}
+
+class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
+
+ val maxAttempts = 5
+ val batchDuration = Seconds(1)
+
+ val conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
+
+ val utils = new PollingFlumeTestUtils
+
+ test("flume polling test") {
+ testMultipleTimes(testFlumePolling)
+ }
+
+ test("flume polling test multiple hosts") {
+ testMultipleTimes(testFlumePollingMultipleHost)
+ }
+
+ /**
+ * Run the given test until no more java.net.BindException's are thrown.
+ * Do this only up to a certain attempt limit.
+ */
+ private def testMultipleTimes(test: () => Unit): Unit = {
+ var testPassed = false
+ var attempt = 0
+ while (!testPassed && attempt < maxAttempts) {
+ try {
+ test()
+ testPassed = true
+ } catch {
+ case e: Exception if Utils.isBindCollision(e) =>
+ logWarning("Exception when running flume polling test: " + e)
+ attempt += 1
+ }
+ }
+ assert(testPassed, s"Test failed after $attempt attempts!")
+ }
+
+ private def testFlumePolling(): Unit = {
+ try {
+ val port = utils.startSingleSink()
+
+ writeAndVerify(Seq(port))
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
+ }
+
+ private def testFlumePollingMultipleHost(): Unit = {
+ try {
+ val ports = utils.startMultipleSinks()
+ writeAndVerify(ports)
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
+ }
+
+ def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+ utils.eventsPerBatch, 5)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
+ outputStream.register()
+
+ ssc.start()
+ try {
+ utils.sendDatAndEnsureAllDataHasBeenReceived()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(batchDuration.milliseconds)
+
+ // The eventually is required to ensure that all data in the batch has been processed.
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val flattenOutput = outputQueue.asScala.toSeq.flatten
+ val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
+ case (key, value) => (key.toString, value.toString)
+ }).map(_.asJava)
+ val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
+ utils.assertOutput(headers.asJava, bodies.asJava)
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000000..7bac1cc4b0
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
+
+class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
+ val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
+ var ssc: StreamingContext = null
+
+ test("flume input stream") {
+ testFlumeStream(testCompression = false)
+ }
+
+ test("flume input compressed stream") {
+ testFlumeStream(testCompression = true)
+ }
+
+ /** Run test on flume stream */
+ private def testFlumeStream(testCompression: Boolean): Unit = {
+ val input = (1 to 100).map { _.toString }
+ val utils = new FlumeTestUtils
+ try {
+ val outputQueue = startContext(utils.getTestPort(), testCompression)
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ utils.writeInput(input.asJava, testCompression)
+ }
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
+ outputEvents.foreach {
+ event =>
+ event.getHeaders.get("test") should be("header")
+ }
+ val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
+ output should be (input)
+ }
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ utils.close()
+ }
+ }
+
+ /** Setup and start the streaming context */
+ private def startContext(
+ testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
+ ssc = new StreamingContext(conf, Milliseconds(200))
+ val flumeStream = FlumeUtils.createStream(
+ ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
+ outputStream.register()
+ ssc.start()
+ outputQueue
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 92a32e7797..b4cfa3a598 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,9 @@
<module>sql/hive</module>
<module>external/docker-integration-tests</module>
<module>assembly</module>
+ <module>external/flume</module>
+ <module>external/flume-sink</module>
+ <module>external/flume-assembly</module>
<module>examples</module>
<module>repl</module>
<module>launcher</module>
@@ -123,6 +126,7 @@
<yarn.version>${hadoop.version}</yarn.version>
<hbase.version>0.98.17-hadoop2</hbase.version>
<hbase.artifact>hbase</hbase.artifact>
+ <flume.version>1.6.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<curator.version>2.4.0</curator.version>
<hive.group>org.spark-project.hive</hive.group>
@@ -189,6 +193,7 @@
during compilation if the dependency is transivite (e.g. "graphx/" depending on "core/" and
needing Hadoop classes in the classpath to compile).
-->
+ <flume.deps.scope>compile</flume.deps.scope>
<hadoop.deps.scope>compile</hadoop.deps.scope>
<hbase.deps.scope>compile</hbase.deps.scope>
<hive.deps.scope>compile</hive.deps.scope>
@@ -1591,6 +1596,46 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ <scope>${flume.deps.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>${flume.version}</version>
+ <scope>${flume.deps.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
@@ -2442,6 +2487,9 @@
that does not have them.
-->
<profile>
+ <id>flume-provided</id>
+ </profile>
+ <profile>
<id>hadoop-provided</id>
</profile>
<profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index dbe98d1e14..fb229b979d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -39,9 +39,9 @@ object BuildCommons {
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(
- streaming, streamingKafka
+ streaming, streamingFlumeSink, streamingFlume, streamingKafka
) = Seq(
- "streaming", "streaming-kafka"
+ "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka"
).map(ProjectRef(buildLocation, _))
val allProjects@Seq(
@@ -56,8 +56,8 @@ object BuildCommons {
Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(assembly, networkYarn, streamingKafkaAssembly, streamingKinesisAslAssembly) =
- Seq("assembly", "network-yarn", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
+ val assemblyProjects@Seq(assembly, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
+ Seq("assembly", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
.map(ProjectRef(buildLocation, _))
val copyJarsProjects@Seq(examples) = Seq("examples").map(ProjectRef(buildLocation, _))
@@ -283,6 +283,8 @@ object SparkBuild extends PomBuild {
/* Hive console settings */
enable(Hive.settings)(hive)
+ enable(Flume.settings)(streamingFlumeSink)
+
enable(Java8TestSettings.settings)(java8Tests)
enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
@@ -348,6 +350,10 @@ object Unsafe {
)
}
+object Flume {
+ lazy val settings = sbtavro.SbtAvro.avroSettings
+}
+
object DockerIntegrationTests {
// This serves to override the override specified in DependencyOverrides:
lazy val settings = Seq(
@@ -526,7 +532,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
@@ -644,9 +650,9 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, testTags),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, testTags),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags),
// Skip actual catalyst, but include the subproject.
// Catalyst is not public API and contains quasiquotes which break scaladoc.
@@ -665,7 +671,7 @@ object Unidoc {
"-public",
"-group", "Core Java API", packageList("api.java", "api.java.function"),
"-group", "Spark Streaming", packageList(
- "streaming.api.java", "streaming.kafka", "streaming.kinesis"
+ "streaming.api.java", "streaming.flume", "streaming.kafka", "streaming.kinesis"
),
"-group", "MLlib", packageList(
"mllib.classification", "mllib.clustering", "mllib.evaluation.binary", "mllib.linalg",
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
new file mode 100644
index 0000000000..cd30483fc6
--- /dev/null
+++ b/python/pyspark/streaming/flume.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+if sys.version >= "3":
+ from io import BytesIO
+else:
+ from StringIO import StringIO
+from py4j.protocol import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
+from pyspark.streaming import DStream
+
+__all__ = ['FlumeUtils', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ if s is None:
+ return None
+ return s.decode('utf-8')
+
+
+class FlumeUtils(object):
+
+ @staticmethod
+ def createStream(ssc, hostname, port,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+ enableDecompression=False,
+ bodyDecoder=utf8_decoder):
+ """
+ Create an input stream that pulls events from Flume.
+
+ :param ssc: StreamingContext object
+ :param hostname: Hostname of the slave machine to which the flume data will be sent
+ :param port: Port of the slave machine to which the flume data will be sent
+ :param storageLevel: Storage level to use for storing the received objects
+ :param enableDecompression: Should netty server decompress input stream
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ helper = FlumeUtils._get_helper(ssc._sc)
+ jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def createPollingStream(ssc, addresses,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+ maxBatchSize=1000,
+ parallelism=5,
+ bodyDecoder=utf8_decoder):
+ """
+ Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ This stream will poll the sink for data and will pull events as they are available.
+
+ :param ssc: StreamingContext object
+ :param addresses: List of (host, port)s on which the Spark Sink is running.
+ :param storageLevel: Storage level to use for storing the received objects
+ :param maxBatchSize: The maximum number of events to be pulled from the Spark sink
+ in a single RPC call
+ :param parallelism: Number of concurrent requests this stream should send to the sink.
+ Note that having a higher number of requests concurrently being pulled
+ will result in this stream using more threads
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ hosts = []
+ ports = []
+ for (host, port) in addresses:
+ hosts.append(host)
+ ports.append(port)
+ helper = FlumeUtils._get_helper(ssc._sc)
+ jstream = helper.createPollingStream(
+ ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def _toPythonDStream(ssc, jstream, bodyDecoder):
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+
+ def func(event):
+ headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
+ headers = {}
+ strSer = UTF8Deserializer()
+ for i in range(0, read_int(headersBytes)):
+ key = strSer.loads(headersBytes)
+ value = strSer.loads(headersBytes)
+ headers[key] = value
+ body = bodyDecoder(event[1])
+ return (headers, body)
+ return stream.map(func)
+
+ @staticmethod
+ def _get_helper(sc):
+ try:
+ return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
+ FlumeUtils._printErrorMsg(sc)
+ raise
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Flume libraries not found in class path. Try one of the following.
+
+ 1. Include the Flume library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index eb4696c55d..d010c0e008 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -45,6 +45,7 @@ from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
+from pyspark.streaming.flume import FlumeUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.streaming.listener import StreamingListener
@@ -1260,6 +1261,148 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
+class FlumeStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+
+ def setUp(self):
+ super(FlumeStreamTests, self).setUp()
+ self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ super(FlumeStreamTests, self).tearDown()
+
+ def _startContext(self, n, compressed):
+ # Start the StreamingContext and also collect the result
+ dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
+ enableDecompression=compressed)
+ result = []
+
+ def get_output(_, rdd):
+ for event in rdd.collect():
+ if len(result) < n:
+ result.append(event)
+ dstream.foreachRDD(get_output)
+ self.ssc.start()
+ return result
+
+ def _validateResult(self, input, result):
+ # Validate both the header and the body
+ header = {"test": "header"}
+ self.assertEqual(len(input), len(result))
+ for i in range(0, len(input)):
+ self.assertEqual(header, result[i][0])
+ self.assertEqual(input[i], result[i][1])
+
+ def _writeInput(self, input, compressed):
+ # Try to write input to the receiver until success or timeout
+ start_time = time.time()
+ while True:
+ try:
+ self._utils.writeInput(input, compressed)
+ break
+ except:
+ if time.time() - start_time < self.timeout:
+ time.sleep(0.01)
+ else:
+ raise
+
+ def test_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), False)
+ self._writeInput(input, False)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+ def test_compressed_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), True)
+ self._writeInput(input, True)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+
+class FlumePollingStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+ maxAttempts = 5
+
+ def setUp(self):
+ self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ def _writeAndVerify(self, ports):
+ # Set up the streaming context and input streams
+ ssc = StreamingContext(self.sc, self.duration)
+ try:
+ addresses = [("localhost", port) for port in ports]
+ dstream = FlumeUtils.createPollingStream(
+ ssc,
+ addresses,
+ maxBatchSize=self._utils.eventsPerBatch(),
+ parallelism=5)
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ dstream.foreachRDD(get_output)
+ ssc.start()
+ self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+
+ self.wait_for(outputBuffer, self._utils.getTotalEvents())
+ outputHeaders = [event[0] for event in outputBuffer]
+ outputBodies = [event[1] for event in outputBuffer]
+ self._utils.assertOutput(outputHeaders, outputBodies)
+ finally:
+ ssc.stop(False)
+
+ def _testMultipleTimes(self, f):
+ attempt = 0
+ while True:
+ try:
+ f()
+ break
+ except:
+ attempt += 1
+ if attempt >= self.maxAttempts:
+ raise
+ else:
+ import traceback
+ traceback.print_exc()
+
+ def _testFlumePolling(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def _testFlumePollingMultipleHosts(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def test_flume_polling(self):
+ self._testMultipleTimes(self._testFlumePolling)
+
+ def test_flume_polling_multiple_hosts(self):
+ self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+
+
class KinesisStreamTests(PySparkStreamingTestCase):
def test_kinesis_stream_api(self):
@@ -1348,6 +1491,23 @@ def search_kafka_assembly_jar():
return jars[0]
+def search_flume_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
+ jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
+ "You need to build Spark with "
+ "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
+ "'build/mvn package' before running this test.")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
+ else:
+ return jars[0]
+
+
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "external/kinesis-asl-assembly")
@@ -1368,18 +1528,20 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
if __name__ == "__main__":
from pyspark.streaming.tests import *
kafka_assembly_jar = search_kafka_assembly_jar()
+ flume_assembly_jar = search_flume_assembly_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
if kinesis_asl_assembly_jar is None:
kinesis_jar_present = False
- jars = kafka_assembly_jar
+ jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
else:
kinesis_jar_present = True
- jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
+ jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- KafkaStreamTests, StreamingListenerTests]
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests,
+ StreamingListenerTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 10ee7d57c7..1eb680dc4c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -18,6 +18,8 @@
package test.org.apache.spark.sql;
import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -303,27 +305,42 @@ public class JavaDataFrameSuite {
Assert.assertEquals(30000.0, actual.get(1).getDouble(2), 0.01);
}
+ private String getResource(String resource) {
+ try {
+ // The following "getResource" has different behaviors in SBT and Maven.
+ // When running in Jenkins, the file path may contain "@" when there are multiple
+ // SparkPullRequestBuilders running in the same worker
+ // (e.g., /home/jenkins/workspace/SparkPullRequestBuilder@2)
+ // When running in SBT, "@" in the file path will be returned as "@", however,
+ // when running in Maven, "@" will be encoded as "%40".
+ // Therefore, we convert it to URI then call "getPath" to decode it back so that it can both
+ // work both in SBT and Maven.
+ URL url = Thread.currentThread().getContextClassLoader().getResource(resource);
+ return url.toURI().getPath();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Test
public void testGenericLoad() {
- Dataset<Row> df1 = context.read().format("text").load(
- Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
+ Dataset<Row> df1 = context.read().format("text").load(getResource("text-suite.txt"));
Assert.assertEquals(4L, df1.count());
Dataset<Row> df2 = context.read().format("text").load(
- Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
- Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
+ getResource("text-suite.txt"),
+ getResource("text-suite2.txt"));
Assert.assertEquals(5L, df2.count());
}
@Test
public void testTextLoad() {
- Dataset<String> ds1 = context.read().text(
- Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
+ Dataset<String> ds1 = context.read().text(getResource("text-suite.txt"));
Assert.assertEquals(4L, ds1.count());
Dataset<String> ds2 = context.read().text(
- Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
- Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
+ getResource("text-suite.txt"),
+ getResource("text-suite2.txt"));
Assert.assertEquals(5L, ds2.count());
}