diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-03-25 17:37:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-25 17:37:16 -0700 |
commit | 24587ce433aa30f30a5d1ed6566365f24c222a27 (patch) | |
tree | b1ff8ffa17b643d3e833be33debecf209d20ff6d /examples/src | |
parent | 54d13bed87fcf2968f77e1f1153e85184ec91d78 (diff) | |
download | spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.gz spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.bz2 spark-24587ce433aa30f30a5d1ed6566365f24c222a27.zip |
[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request?
This PR moves flume back to Spark as per the discussion in the dev mail-list.
## How was this patch tested?
Existing Jenkins tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11895 from zsxwing/move-flume-back.
Diffstat (limited to 'examples/src')
3 files changed, 212 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java new file mode 100644 index 0000000000..da56637fe8 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.flume.FlumeUtils; +import org.apache.spark.streaming.flume.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: JavaFlumeEventCount <host> <port> + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>` + */ +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + + public static void main(String[] args) { + if (args.length != 2) { + System.err.println("Usage: JavaFlumeEventCount <host> <port>"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String host = args[0]; + int port = Integer.parseInt(args[1]); + + Duration batchInterval = new Duration(2000); + SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); + JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); + + flumeStream.count(); + + flumeStream.count().map(new Function<Long, String>() { + @Override + public String call(Long in) { + return "Received " + in + " flume events."; + } + }).print(); + + ssc.start(); + ssc.awaitTermination(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala new file mode 100644 index 0000000000..91e52e4eff --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount <host> <port> + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> ` + */ +object FlumeEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumeEventCount <host> <port>") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumeEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream + val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala new file mode 100644 index 0000000000..dd725d72c2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with the Spark Sink running in a Flume agent. See + * the Spark Streaming programming guide for more details. + * + * Usage: FlumePollingEventCount <host> <port> + * `host` is the host on which the Spark Sink is running. + * `port` is the port at which the Spark Sink is listening. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` + */ +object FlumePollingEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumePollingEventCount <host> <port>") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream that polls the Spark Sink running in a Flume agent + val stream = FlumeUtils.createPollingStream(ssc, host, port) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println |