diff options
author | aniketbhatnagar <aniket.bhatnagar@gmail.com> | 2014-09-26 09:47:58 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-26 09:48:46 -0700 |
commit | d16e161d744b27291fd2ee7e3578917ee14d83f9 (patch) | |
tree | 33ee08c660ea60373bc802fdeae250c5fc596ec1 /extras/kinesis-asl/src | |
parent | 1aa549ba9839565274a12c52fa1075b424f138a6 (diff) | |
download | spark-d16e161d744b27291fd2ee7e3578917ee14d83f9.tar.gz spark-d16e161d744b27291fd2ee7e3578917ee14d83f9.tar.bz2 spark-d16e161d744b27291fd2ee7e3578917ee14d83f9.zip |
SPARK-3639 | Removed settings master in examples
This patch removes setting of master as local in Kinesis examples so that users can set it using submit-job.
Author: aniketbhatnagar <aniket.bhatnagar@gmail.com>
Closes #2536 from aniketbhatnagar/Kinesis-Examples-Master-Unset and squashes the following commits:
c9723ac [aniketbhatnagar] Merge remote-tracking branch 'origin/Kinesis-Examples-Master-Unset' into Kinesis-Examples-Master-Unset
fec8ead [aniketbhatnagar] SPARK-3639 | Removed settings master in examples
31cdc59 [aniketbhatnagar] SPARK-3639 | Removed settings master in examples
Diffstat (limited to 'extras/kinesis-asl/src')
2 files changed, 9 insertions, 13 deletions
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index aa917d0575..b0bff27a61 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -71,6 +71,9 @@ import com.google.common.collect.Lists; * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ * https://kinesis.us-east-1.amazonaws.com * + * Note that number of workers/threads should be 1 more than the number of receivers. + * This leaves one thread available for actually processing the data. + * * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data * onto the Kinesis stream. * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. @@ -114,12 +117,8 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ int numStreams = numShards; - /* Must add 1 more thread than the number of receivers or the output won't show properly from the driver */ - int numSparkThreads = numStreams + 1; - /* Setup the Spark config. */ - SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount").setMaster( - "local[" + numSparkThreads + "]"); + SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount"); /* Kinesis checkpoint interval. Same as batchInterval for this example. */ Duration checkpointInterval = batchInterval; diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index fffd90de08..32da0858d1 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -65,6 +65,10 @@ import org.apache.log4j.Level * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ * https://kinesis.us-east-1.amazonaws.com * + * + * Note that number of workers/threads should be 1 more than the number of receivers. + * This leaves one thread available for actually processing the data. + * * There is a companion helper class below called KinesisWordCountProducerASL which puts * dummy data onto the Kinesis stream. * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. @@ -97,17 +101,10 @@ private object KinesisWordCountASL extends Logging { /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ val numStreams = numShards - /* - * numSparkThreads should be 1 more thread than the number of receivers. - * This leaves one thread available for actually processing the data. - */ - val numSparkThreads = numStreams + 1 - /* Setup the and SparkConfig and StreamingContext */ /* Spark Streaming batch interval */ - val batchInterval = Milliseconds(2000) + val batchInterval = Milliseconds(2000) val sparkConfig = new SparkConf().setAppName("KinesisWordCount") - .setMaster(s"local[$numSparkThreads]") val ssc = new StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint interval. Same as batchInterval for this example. */ |