diff options
author | Denny <dennybritz@gmail.com> | 2012-12-05 10:16:56 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-12-05 10:16:56 -0800 |
commit | 15df4b0e52c1a594ed07981e6f2ee1602de7ccbb (patch) | |
tree | becf272ab93d07e48c2187939f9572b0e14fb93b /streaming/src/main | |
parent | 5e2b0a3bf60dead1ac7946c9984b067c926c2904 (diff) | |
parent | a69a82be2682148f5d1ebbdede15a47c90eea73d (diff) | |
download | spark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.tar.gz spark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.tar.bz2 spark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.zip |
Merge branch 'dev' into kafka
Conflicts:
streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src/main')
18 files changed, 131 insertions, 95 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index c76c73b35a..85106b3ad8 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ case class DStreamCheckpointData(rdds: HashMap[Time, Any]) @@ -31,7 +51,7 @@ extends Serializable with Logging { * ---------------------------------------------- */ - // Time by which the window slides in this DStream + // Time interval at which the DStream generates an RDD def slideTime: Time // List of parent DStreams on which this DStream depends on @@ -129,6 +149,8 @@ extends Serializable with Logging { } protected[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + assert( !mustCheckpoint || checkpointInterval != null, "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " + @@ -163,13 +185,25 @@ extends Serializable with Logging { checkpointInterval + "). Please set it to higher than " + checkpointInterval + "." ) + val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) + assert( + metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + + "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + + "the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." + ) + dependencies.foreach(_.validate()) logInfo("Slide time = " + slideTime) logInfo("Storage level = " + storageLevel) logInfo("Checkpoint interval = " + checkpointInterval) logInfo("Remember duration = " + rememberDuration) - logInfo("Initialized " + this) + logInfo("Initialized and validated " + this) } protected[streaming] def setContext(s: StreamingContext) { @@ -189,12 +223,12 @@ extends Serializable with Logging { dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def setRememberDuration(duration: Time) { + protected[streaming] def remember(duration: Time) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) } - dependencies.foreach(_.setRememberDuration(parentRememberDuration)) + dependencies.foreach(_.remember(parentRememberDuration)) } /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ @@ -331,7 +365,8 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data for time " + currentTime) + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, " + + "[" + checkpointData.mkString(",") + "]") } /** diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bd8c033eab..d0a9ade61d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } zeroTime = time outputStreams.foreach(_.initialize(zeroTime)) - outputStreams.foreach(_.setRememberDuration(rememberDuration)) + outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) inputStreams.par.foreach(_.start()) } @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def setRememberDuration(duration: Time) { + private[streaming] def remember(duration: Time) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 4c42692295..73ba877085 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -58,7 +58,7 @@ class NetworkInputTracker( throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - logInfo("Registered receiver for network stream " + streamId) + logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) sender ! true } case AddBlocks(streamId, blockIds, metadata) => { diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index e09d27d34f..720e63bba0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._ import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ +import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer @@ -115,7 +116,10 @@ extends Serializable { slideTime: Time, partitioner: Partitioner ): DStream[(K, V)] = { - self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner) + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowTime, slideTime) + .reduceByKey(cleanedReduceFunc, partitioner) } // This method is the efficient sliding window reduce operation, diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 8b484e6acf..f63a9e0011 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( if (seqOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { - throw new Exception("Neither previous window has value for key, nor new values found") + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") } // Reduce the new values newValues.reduce(reduceF) // return diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index e2dca91179..014021be61 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -17,7 +17,7 @@ extends Logging { val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 5e11e6d734..8153dd4567 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -17,20 +17,41 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID - -final class StreamingContext ( +import spark.util.MetadataCleaner + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( sc_ : SparkContext, - cp_ : Checkpoint + cp_ : Checkpoint, + batchDur_ : Time ) extends Logging { - def this(sparkContext: SparkContext) = this(sparkContext, null) - - def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) = - this(new SparkContext(master, frameworkName, sparkHome, jars), null) + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) - def this(path: String) = this(null, CheckpointReader.read(path)) + /** + * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Time) = + this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) - def this(cp_ : Checkpoint) = this(null, cp_) + /** + * Recreates the StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) initLogging() @@ -57,7 +78,10 @@ final class StreamingContext ( cp_.graph.restoreCheckpointData() cp_.graph } else { - new DStreamGraph() + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph } } @@ -77,12 +101,8 @@ final class StreamingContext ( private[streaming] var receiverJobThread: Thread = null private[streaming] var scheduler: Scheduler = null - def setBatchDuration(duration: Time) { - graph.setBatchDuration(duration) - } - - def setRememberDuration(duration: Time) { - graph.setRememberDuration(duration) + def remember(duration: Time) { + graph.remember(duration) } def checkpoint(dir: String, interval: Time = null) { @@ -195,6 +215,10 @@ final class StreamingContext ( inputStream } + def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + new UnionDStream[T](streams.toArray) + } + /** * This function registers a InputDStream as an input stream that will be * started (InputDStream.start() called) to get the input data streams. @@ -220,11 +244,8 @@ final class StreamingContext ( "Checkpoint directory has been set, but the graph checkpointing interval has " + "not been set. Please use StreamingContext.checkpoint() to set the interval." ) - - } - /** * This function starts the execution of the streams. */ @@ -271,6 +292,17 @@ final class StreamingContext ( object StreamingContext { + + def createNewSparkContext(master: String, frameworkName: String): SparkContext = { + + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second interval. + if (MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(60) + } + new SparkContext(master, frameworkName) + } + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index ce89a3f99b..e4d2a634f5 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -2,6 +2,7 @@ package spark.streaming import spark.RDD import spark.rdd.UnionRDD +import spark.storage.StorageLevel class WindowedDStream[T: ClassManifest]( @@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + parent.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(parent) diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala deleted file mode 100644 index d2fdabd659..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel -import spark.streaming._ -import spark.streaming.StreamingContext._ - -object CountRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "CountRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) - - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala index d68611abd6..81938d30d4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala @@ -14,10 +14,9 @@ object FileStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "FileStream") - ssc.setBatchDuration(Seconds(2)) - + // Create the context + val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) + // Create the new directory val directory = new Path(args(1)) val fs = directory.getFileSystem(new Configuration()) diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala index 21a83c0fde..b7bc15a1d5 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -32,9 +32,8 @@ object FileStreamWithCheckpoint { if (!fs.exists(directory)) fs.mkdirs(directory) // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint") - ssc_.setBatchDuration(Seconds(1)) - ssc_.checkpoint(checkpointDir, Seconds(1)) + val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) + ssc_.checkpoint(checkpointDir) // Setup the streaming computation val inputStream = ssc_.textFileStream(directory.toString) diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index ffbea6e55d..6cb2b4c042 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -16,9 +16,10 @@ object GrepRaw { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - // Create the context and set the batch size - val ssc = new StreamingContext(master, "GrepRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) + // Create the context + val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala index 2af51bad28..2a265d021d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -1,9 +1,8 @@ package spark.streaming.examples import spark.RDD -import spark.streaming.StreamingContext +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.Seconds import scala.collection.mutable.SynchronizedQueue @@ -15,10 +14,9 @@ object QueueStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "QueueStream") - ssc.setBatchDuration(Seconds(1)) - + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + // Create the queue through which RDDs can be pushed to // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 0411bde1a7..fe4c2bf155 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -20,12 +20,11 @@ object TopKWordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args val k = 10 - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in /*warmUp(ssc.sc)*/ diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala index 591cb141c3..867a8f42c4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -10,9 +10,8 @@ object WordCountHdfs { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountHdfs") - ssc.setBatchDuration(Seconds(2)) + // Create the context + val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala index ba1bd1de7c..eadda60563 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._ object WordCountNetwork { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: WordCountNetwork <master> <hostname> <port>") + System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork") - ssc.setBatchDuration(Seconds(2)) + val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 571428c0fe..a29c81d437 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -19,12 +19,11 @@ object WordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 1a51fb66cd..68be6b7893 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -23,9 +23,8 @@ object PageViewStream { val host = args(1) val port = args(2).toInt - // Create the context and set the batch size - val ssc = new StreamingContext("local[2]", "PageViewStream") - ssc.setBatchDuration(Seconds(1)) + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) |