aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-25 17:37:16 -0700
committerReynold Xin <rxin@databricks.com>2016-03-25 17:37:16 -0700
commit24587ce433aa30f30a5d1ed6566365f24c222a27 (patch)
treeb1ff8ffa17b643d3e833be33debecf209d20ff6d /examples
parent54d13bed87fcf2968f77e1f1153e85184ec91d78 (diff)
downloadspark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.gz
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.bz2
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.zip
[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request? This PR moves flume back to Spark as per the discussion in the dev mail-list. ## How was this patch tested? Existing Jenkins tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11895 from zsxwing/move-flume-back.
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java75
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala70
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala67
4 files changed, 217 insertions, 0 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 1aa730c0dc..b7f37978b9 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -67,6 +67,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
new file mode 100644
index 0000000000..da56637fe8
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: JavaFlumeEventCount <host> <port>
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
+ */
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 2) {
+ System.err.println("Usage: JavaFlumeEventCount <host> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+
+ Duration batchInterval = new Duration(2000);
+ SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
+ JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
+
+ flumeStream.count();
+
+ flumeStream.count().map(new Function<Long, String>() {
+ @Override
+ public String call(Long in) {
+ return "Received " + in + " flume events.";
+ }
+ }).print();
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
new file mode 100644
index 0000000000..91e52e4eff
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <host> <port>
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
+ */
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: FlumeEventCount <host> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+
+ // Create the context and set the batch size
+ val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+ val ssc = new StreamingContext(sparkConf, batchInterval)
+
+ // Create a flume stream
+ val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
new file mode 100644
index 0000000000..dd725d72c2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with the Spark Sink running in a Flume agent. See
+ * the Spark Streaming programming guide for more details.
+ *
+ * Usage: FlumePollingEventCount <host> <port>
+ * `host` is the host on which the Spark Sink is running.
+ * `port` is the port at which the Spark Sink is listening.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
+ */
+object FlumePollingEventCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: FlumePollingEventCount <host> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+
+ // Create the context and set the batch size
+ val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
+ val ssc = new StreamingContext(sparkConf, batchInterval)
+
+ // Create a flume stream that polls the Spark Sink running in a Flume agent
+ val stream = FlumeUtils.createPollingStream(ssc, host, port)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println