diff options
16 files changed, 92 insertions, 98 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 26d5ce9198..8efda2074d 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext) extends Serializable with Logging { @@ -28,7 +48,7 @@ extends Serializable with Logging { * ---------------------------------------------- */ - // Time by which the window slides in this DStream + // Time interval at which the DStream generates an RDD def slideTime: Time // List of parent DStreams on which this DStream depends on @@ -186,12 +206,12 @@ extends Serializable with Logging { dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def setRememberDuration(duration: Time) { + protected[streaming] def remember(duration: Time) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) } - dependencies.foreach(_.setRememberDuration(parentRememberDuration)) + dependencies.foreach(_.remember(parentRememberDuration)) } /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bd8c033eab..d0a9ade61d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } zeroTime = time outputStreams.foreach(_.initialize(zeroTime)) - outputStreams.foreach(_.setRememberDuration(rememberDuration)) + outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) inputStreams.par.foreach(_.start()) } @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def setRememberDuration(duration: Time) { + private[streaming] def remember(duration: Time) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7a9a71f303..4a41f2f516 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -18,19 +18,39 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID -final class StreamingContext ( +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( sc_ : SparkContext, - cp_ : Checkpoint + cp_ : Checkpoint, + batchDur_ : Time ) extends Logging { - def this(sparkContext: SparkContext) = this(sparkContext, null) - - def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) = - this(new SparkContext(master, frameworkName, sparkHome, jars), null) + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) - def this(path: String) = this(null, CheckpointReader.read(path)) + /** + * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Time) = + this(new SparkContext(master, frameworkName), null, batchDuration) - def this(cp_ : Checkpoint) = this(null, cp_) + /** + * Recreates the StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) initLogging() @@ -57,7 +77,10 @@ final class StreamingContext ( cp_.graph.restoreCheckpointData() cp_.graph } else { - new DStreamGraph() + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph } } @@ -77,12 +100,8 @@ final class StreamingContext ( private[streaming] var receiverJobThread: Thread = null private[streaming] var scheduler: Scheduler = null - def setBatchDuration(duration: Time) { - graph.setBatchDuration(duration) - } - - def setRememberDuration(duration: Time) { - graph.setRememberDuration(duration) + def remember(duration: Time) { + graph.remember(duration) } def checkpoint(dir: String, interval: Time = null) { diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala deleted file mode 100644 index d2fdabd659..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel -import spark.streaming._ -import spark.streaming.StreamingContext._ - -object CountRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "CountRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) - - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala index d68611abd6..81938d30d4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala @@ -14,10 +14,9 @@ object FileStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "FileStream") - ssc.setBatchDuration(Seconds(2)) - + // Create the context + val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) + // Create the new directory val directory = new Path(args(1)) val fs = directory.getFileSystem(new Configuration()) diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala index 21a83c0fde..b7bc15a1d5 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -32,9 +32,8 @@ object FileStreamWithCheckpoint { if (!fs.exists(directory)) fs.mkdirs(directory) // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint") - ssc_.setBatchDuration(Seconds(1)) - ssc_.checkpoint(checkpointDir, Seconds(1)) + val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) + ssc_.checkpoint(checkpointDir) // Setup the streaming computation val inputStream = ssc_.textFileStream(directory.toString) diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index ffbea6e55d..6cb2b4c042 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -16,9 +16,10 @@ object GrepRaw { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - // Create the context and set the batch size - val ssc = new StreamingContext(master, "GrepRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) + // Create the context + val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala index 2af51bad28..2a265d021d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -1,9 +1,8 @@ package spark.streaming.examples import spark.RDD -import spark.streaming.StreamingContext +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.Seconds import scala.collection.mutable.SynchronizedQueue @@ -15,10 +14,9 @@ object QueueStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "QueueStream") - ssc.setBatchDuration(Seconds(1)) - + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + // Create the queue through which RDDs can be pushed to // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 0411bde1a7..fe4c2bf155 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -20,12 +20,11 @@ object TopKWordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args val k = 10 - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in /*warmUp(ssc.sc)*/ diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala index 591cb141c3..867a8f42c4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -10,9 +10,8 @@ object WordCountHdfs { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountHdfs") - ssc.setBatchDuration(Seconds(2)) + // Create the context + val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala index ba1bd1de7c..eadda60563 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._ object WordCountNetwork { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: WordCountNetwork <master> <hostname> <port>") + System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork") - ssc.setBatchDuration(Seconds(2)) + val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 571428c0fe..a29c81d437 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -19,12 +19,11 @@ object WordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 1a51fb66cd..68be6b7893 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -23,9 +23,8 @@ object PageViewStream { val host = args(1) val port = args(2).toInt - // Create the context and set the batch size - val ssc = new StreamingContext("local[2]", "PageViewStream") - ssc.setBatchDuration(Seconds(1)) + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index d0aaac0f2e..dc38ef4912 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase { } val ssc = setupStreams(input, operation _) - ssc.setRememberDuration(rememberDuration) + ssc.remember(rememberDuration) runStreams[(Int, Int)](ssc, input.size, input.size / 2) val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 3e99440226..e98c096725 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) @@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) @@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) @@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val filestream = ssc.textFileStream(testDir.toString) var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 5fb5cc504c..8cc2f8ccfc 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } @@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } |