aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-18 18:24:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-18 18:24:15 -0700
commit3a6003866ade45974b43a9e785ec35fb76a32b99 (patch)
treef41298288d49478b61e87ed04cf2260814d351d2 /extras/kinesis-asl
parent0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626 (diff)
downloadspark-3a6003866ade45974b43a9e785ec35fb76a32b99.tar.gz
spark-3a6003866ade45974b43a9e785ec35fb76a32b99.tar.bz2
spark-3a6003866ade45974b43a9e785ec35fb76a32b99.zip
[SPARK-7692] Updated Kinesis examples
- Updated Kinesis examples to use stable API - Cleaned up comments, etc. - Renamed KinesisWordCountProducerASL to KinesisWordProducerASL Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6249 from tdas/kinesis-examples and squashes the following commits: 7cc307b [Tathagata Das] More tweaks f080872 [Tathagata Das] More cleanup 841987f [Tathagata Das] Small update 011cbe2 [Tathagata Das] More fixes b0d74f9 [Tathagata Das] Updated examples.
Diffstat (limited to 'extras/kinesis-asl')
-rw-r--r--extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java245
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala260
2 files changed, 268 insertions, 237 deletions
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index b0bff27a61..06e0ff28af 100644
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
+import com.amazonaws.regions.RegionUtils;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -40,140 +41,146 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.google.common.collect.Lists;
/**
- * Java-friendly Kinesis Spark Streaming WordCount example
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
*
- * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details
- * on the Kinesis Spark Streaming integration.
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
*
- * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
- * for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given
- * <stream-name> and <endpoint-url>.
+ * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
+ * [app-name] is the name of the consumer app, used to track the read data in DynamoDB
+ * [stream-name] name of the Kinesis stream (ie. mySparkStream)
+ * [endpoint-url] endpoint of the Kinesis service
+ * (e.g. https://kinesis.us-east-1.amazonaws.com)
*
- * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
- *
- * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
- * in the following order of precedence:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- *
- * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
- * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- * <endpoint-url> is the endpoint of the Kinesis service
- * (ie. https://kinesis.us-east-1.amazonaws.com)
*
* Example:
- * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * # export AWS keys if necessary
+ * $ export AWS_ACCESS_KEY_ID=[your-access-key]
* $ export AWS_SECRET_KEY=<your-secret-key>
- * $ $SPARK_HOME/bin/run-example \
- * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com
*
- * Note that number of workers/threads should be 1 more than the number of receivers.
- * This leaves one thread available for actually processing the data.
+ * # run the example
+ * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
*
- * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data
- * onto the Kinesis stream.
- * Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
*/
public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
- private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
- private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
-
- /* Make the constructor private to enforce singleton */
- private JavaKinesisWordCountASL() {
+ private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+ private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
+
+ public static void main(String[] args) {
+ // Check that all required args were passed in.
+ if (args.length != 3) {
+ System.err.println(
+ "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
+ " <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
+ " <stream-name> is the name of the Kinesis stream\n" +
+ " <endpoint-url> is the endpoint of the Kinesis service\n" +
+ " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
+ "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
+ "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
+ "details.\n"
+ );
+ System.exit(1);
}
- public static void main(String[] args) {
- /* Check that all required args were passed in. */
- if (args.length < 2) {
- System.err.println(
- "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n" +
- " <stream-name> is the name of the Kinesis stream\n" +
- " <endpoint-url> is the endpoint of the Kinesis service\n" +
- " (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- /* Populate the appropriate variables from the given args */
- String streamName = args[0];
- String endpointUrl = args[1];
- /* Set the batch interval to a fixed 2000 millis (2 seconds) */
- Duration batchInterval = new Duration(2000);
-
- /* Create a Kinesis client in order to determine the number of shards for the given stream */
- AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
- new DefaultAWSCredentialsProviderChain());
- kinesisClient.setEndpoint(endpointUrl);
-
- /* Determine the number of shards from the stream */
- int numShards = kinesisClient.describeStream(streamName)
- .getStreamDescription().getShards().size();
-
- /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */
- int numStreams = numShards;
-
- /* Setup the Spark config. */
- SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount");
-
- /* Kinesis checkpoint interval. Same as batchInterval for this example. */
- Duration checkpointInterval = batchInterval;
+ // Set default log4j logging level to WARN to hide Spark logs
+ StreamingExamples.setStreamingLogLevels();
+
+ // Populate the appropriate variables from the given args
+ String kinesisAppName = args[0];
+ String streamName = args[1];
+ String endpointUrl = args[2];
+
+ // Create a Kinesis client in order to determine the number of shards for the given stream
+ AmazonKinesisClient kinesisClient =
+ new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
+ kinesisClient.setEndpoint(endpointUrl);
+ int numShards =
+ kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
+
+
+ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+ // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+ // then the shards will be automatically distributed among the receivers and each receiver
+ // will receive data from multiple shards.
+ int numStreams = numShards;
+
+ // Spark Streaming batch interval
+ Duration batchInterval = new Duration(2000);
+
+ // Kinesis checkpoint interval. Same as batchInterval for this example.
+ Duration kinesisCheckpointInterval = batchInterval;
+
+ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+ // DynamoDB of the same region as the Kinesis stream
+ String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
+
+ // Setup the Spark config and StreamingContext
+ SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+
+ // Create the Kinesis DStreams
+ List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
+ for (int i = 0; i < numStreams; i++) {
+ streamsList.add(
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
+ );
+ }
- /* Setup the StreamingContext */
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+ // Union all the streams if there is more than 1 stream
+ JavaDStream<byte[]> unionStreams;
+ if (streamsList.size() > 1) {
+ unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+ } else {
+ // Otherwise, just use the 1 stream
+ unionStreams = streamsList.get(0);
+ }
- /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
- List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
- for (int i = 0; i < numStreams; i++) {
- streamsList.add(
- KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval,
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
- );
+ // Convert each line of Array[Byte] to String, and split into words
+ JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
+ @Override
+ public Iterable<String> call(byte[] line) {
+ return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+ }
+ });
+
+ // Map each word to a (word, 1) tuple so we can reduce by key to count the words
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
}
-
- /* Union all the streams if there is more than 1 stream */
- JavaDStream<byte[]> unionStreams;
- if (streamsList.size() > 1) {
- unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
- } else {
- /* Otherwise, just use the 1 stream */
- unionStreams = streamsList.get(0);
+ ).reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
}
+ );
- /*
- * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection.
- * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
- */
- JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
- @Override
- public Iterable<String> call(byte[] line) {
- return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
- }
- });
-
- /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- /* Print the first 10 wordCounts */
- wordCounts.print();
-
- /* Start the streaming context and await termination */
- jssc.start();
- jssc.awaitTermination();
- }
+ // Print the first 10 wordCounts
+ wordCounts.print();
+
+ // Start the streaming context and await termination
+ jssc.start();
+ jssc.awaitTermination();
+ }
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 32da0858d1..640ca049e2 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -18,213 +18,238 @@
package org.apache.spark.examples.streaming
import java.nio.ByteBuffer
+
import scala.util.Random
-import org.apache.spark.Logging
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Milliseconds
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+
+import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials}
+import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
/**
- * Kinesis Spark Streaming WordCount example.
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
*
- * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on
- * the Kinesis Spark Streaming integration.
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
*
- * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
- * for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given
- * <stream-name> and <endpoint-url>.
+ * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+ * <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ * <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ * <endpoint-url> endpoint of the Kinesis service
+ * (e.g. https://kinesis.us-east-1.amazonaws.com)
*
- * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
- *
- * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
- * in the following order of precedence:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- *
- * Usage: KinesisWordCountASL <stream-name> <endpoint-url>
- * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- * <endpoint-url> is the endpoint of the Kinesis service
- * (ie. https://kinesis.us-east-1.amazonaws.com)
*
* Example:
- * $ export AWS_ACCESS_KEY_ID=<your-access-key>
- * $ export AWS_SECRET_KEY=<your-secret-key>
- * $ $SPARK_HOME/bin/run-example \
- * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com
+ * # export AWS keys if necessary
+ * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ * # run the example
+ * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
*
- *
- * Note that number of workers/threads should be 1 more than the number of receivers.
- * This leaves one thread available for actually processing the data.
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
*
- * There is a companion helper class below called KinesisWordCountProducerASL which puts
- * dummy data onto the Kinesis stream.
- * Usage instructions for KinesisWordCountProducerASL are provided in that class definition.
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
*/
-private object KinesisWordCountASL extends Logging {
+object KinesisWordCountASL extends Logging {
def main(args: Array[String]) {
- /* Check that all required args were passed in. */
- if (args.length < 2) {
+ // Check that all required args were passed in.
+ if (args.length != 3) {
System.err.println(
"""
- |Usage: KinesisWordCount <stream-name> <endpoint-url>
+ |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+ |
+ | <app-name> is the name of the consumer app, used to track the read data in DynamoDB
| <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com)
+ |
+ |Generate input data for Kinesis stream using the example KinesisWordProducerASL.
+ |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
+ |details.
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
- /* Populate the appropriate variables from the given args */
- val Array(streamName, endpointUrl) = args
+ // Populate the appropriate variables from the given args
+ val Array(appName, streamName, endpointUrl) = args
- /* Determine the number of shards from the stream */
- val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+
+ // Determine the number of shards from the stream using the low-level Kinesis Client
+ // from the AWS Java SDK.
+ val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
+ require(credentials != null,
+ "No AWS credentials found. Please specify credentials using one of the methods specified " +
+ "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
+ val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
- val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
- .size()
+ val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
+
- /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
+ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+ // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+ // then the shards will be automatically distributed among the receivers and each receiver
+ // will receive data from multiple shards.
val numStreams = numShards
- /* Setup the and SparkConfig and StreamingContext */
- /* Spark Streaming batch interval */
+ // Spark Streaming batch interval
val batchInterval = Milliseconds(2000)
- val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
- val ssc = new StreamingContext(sparkConfig, batchInterval)
- /* Kinesis checkpoint interval. Same as batchInterval for this example. */
+ // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
+ //on sequence number of records that have been received. Same as batchInterval for this example.
val kinesisCheckpointInterval = batchInterval
- /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+ // DynamoDB of the same region as the Kinesis stream
+ val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+
+ // Setup the SparkConfig and StreamingContext
+ val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
+ val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+ // Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
- KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
+ InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
- /* Union all the streams */
+ // Union all the streams
val unionStreams = ssc.union(kinesisStreams)
- /* Convert each line of Array[Byte] to String, split into words, and count them */
- val words = unionStreams.flatMap(byteArray => new String(byteArray)
- .split(" "))
+ // Convert each line of Array[Byte] to String, and split into words
+ val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
- /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
+ // Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
-
- /* Print the first 10 wordCounts */
+
+ // Print the first 10 wordCounts
wordCounts.print()
- /* Start the streaming context and await termination */
+ // Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
/**
- * Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url>
- * <recordsPerSec> <wordsPerRecord>
+ * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
+ * <records-per-sec> <words-per-record>
+ *
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- * <kinesis-endpoint-url> is the endpoint of the Kinesis service
+ * <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* <records-per-sec> is the rate of records per second to put onto the stream
* <words-per-record> is the rate of records per second to put onto the stream
*
* Example:
- * $ export AWS_ACCESS_KEY_ID=<your-access-key>
- * $ export AWS_SECRET_KEY=<your-secret-key>
- * $ $SPARK_HOME/bin/run-example \
- * org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com 10 5
+ * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
*/
-private object KinesisWordCountProducerASL {
+object KinesisWordProducerASL {
def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: KinesisWordCountProducerASL <stream-name> <endpoint-url>" +
- " <records-per-sec> <words-per-record>")
+ if (args.length != 4) {
+ System.err.println(
+ """
+ |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec> <words-per-record>
+ |
+ | <stream-name> is the name of the Kinesis stream
+ | <endpoint-url> is the endpoint of the Kinesis service
+ | (e.g. https://kinesis.us-east-1.amazonaws.com)
+ | <records-per-sec> is the rate of records per second to put onto the stream
+ | <words-per-record> is the rate of records per second to put onto the stream
+ |
+ """.stripMargin)
+
System.exit(1)
}
+ // Set default log4j logging level to WARN to hide Spark logs
StreamingExamples.setStreamingLogLevels()
- /* Populate the appropriate variables from the given args */
+ // Populate the appropriate variables from the given args
val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
- /* Generate the records and return the totals */
- val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
+ // Generate the records and return the totals
+ val totals = generate(stream, endpoint, recordsPerSecond.toInt,
+ wordsPerRecord.toInt)
- /* Print the array of (index, total) tuples */
- println("Totals")
- totals.foreach(total => println(total.toString()))
+ // Print the array of (word, total) tuples
+ println("Totals for the words sent")
+ totals.foreach(println(_))
}
def generate(stream: String,
endpoint: String,
recordsPerSecond: Int,
- wordsPerRecord: Int): Seq[(Int, Int)] = {
-
- val MaxRandomInts = 10
+ wordsPerRecord: Int): Seq[(String, Int)] = {
- /* Create the Kinesis client */
+ val randomWords = List("spark","you","are","my","father")
+ val totals = scala.collection.mutable.Map[String, Int]()
+
+ // Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
- s" $recordsPerSecond records per second and $wordsPerRecord words per record");
-
- val totals = new Array[Int](MaxRandomInts)
- /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */
- for (i <- 1 to 5) {
-
- /* Generate recordsPerSec records to put onto the stream */
- val records = (1 to recordsPerSecond.toInt).map { recordNum =>
- /*
- * Randomly generate each wordsPerRec words between 0 (inclusive)
- * and MAX_RANDOM_INTS (exclusive)
- */
+ s" $recordsPerSecond records per second and $wordsPerRecord words per record")
+
+ // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
+ for (i <- 1 to 10) {
+ // Generate recordsPerSec records to put onto the stream
+ val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+ // Randomly generate wordsPerRecord number of words
val data = (1 to wordsPerRecord.toInt).map(x => {
- /* Generate the random int */
- val randomInt = Random.nextInt(MaxRandomInts)
+ // Get a random index to a word
+ val randomWordIdx = Random.nextInt(randomWords.size)
+ val randomWord = randomWords(randomWordIdx)
- /* Keep track of the totals */
- totals(randomInt) += 1
+ // Increment total count to compare to server counts later
+ totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
- randomInt.toString()
+ randomWord
}).mkString(" ")
- /* Create a partitionKey based on recordNum */
+ // Create a partitionKey based on recordNum
val partitionKey = s"partitionKey-$recordNum"
- /* Create a PutRecordRequest with an Array[Byte] version of the data */
+ // Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
- .withData(ByteBuffer.wrap(data.getBytes()));
+ .withData(ByteBuffer.wrap(data.getBytes()))
- /* Put the record onto the stream and capture the PutRecordResult */
- val putRecordResult = kinesisClient.putRecord(putRecordRequest);
+ // Put the record onto the stream and capture the PutRecordResult
+ val putRecordResult = kinesisClient.putRecord(putRecordRequest)
}
- /* Sleep for a second */
+ // Sleep for a second
Thread.sleep(1000)
println("Sent " + recordsPerSecond + " records")
}
-
- /* Convert the totals to (index, total) tuple */
- (0 to (MaxRandomInts - 1)).zip(totals)
+ // Convert the totals to (index, total) tuple
+ totals.toSeq.sortBy(_._1)
}
}
@@ -233,8 +258,7 @@ private object KinesisWordCountProducerASL {
* This has been lifted from the examples/ project to remove the circular dependency.
*/
private[streaming] object StreamingExamples extends Logging {
-
- /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+ // Set reasonable logging levels for streaming if the user has not configured log4j.
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {