diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-19 19:04:39 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-19 19:04:39 -0800 |
commit | fd11d23bb3a817dabd414bceddebc35ad731f626 (patch) | |
tree | 706feed2dfe03e8ed54ec374f69af44e8d06ddd6 /streaming | |
parent | c97ebf64377e853ab7c616a103869a4417f25954 (diff) | |
download | spark-fd11d23bb3a817dabd414bceddebc35ad731f626.tar.gz spark-fd11d23bb3a817dabd414bceddebc35ad731f626.tar.bz2 spark-fd11d23bb3a817dabd414bceddebc35ad731f626.zip |
Modified StreamingContext API to make constructor accept the batch size (since it is always needed, Patrick's suggestion). Added description to DStream and StreamingContext.
Diffstat (limited to 'streaming')
16 files changed, 92 insertions, 98 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 26d5ce9198..8efda2074d 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext) extends Serializable with Logging { @@ -28,7 +48,7 @@ extends Serializable with Logging { * ---------------------------------------------- */ - // Time by which the window slides in this DStream + // Time interval at which the DStream generates an RDD def slideTime: Time // List of parent DStreams on which this DStream depends on @@ -186,12 +206,12 @@ extends Serializable with Logging { dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def setRememberDuration(duration: Time) { + protected[streaming] def remember(duration: Time) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) } - dependencies.foreach(_.setRememberDuration(parentRememberDuration)) + dependencies.foreach(_.remember(parentRememberDuration)) } /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bd8c033eab..d0a9ade61d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } zeroTime = time outputStreams.foreach(_.initialize(zeroTime)) - outputStreams.foreach(_.setRememberDuration(rememberDuration)) + outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) inputStreams.par.foreach(_.start()) } @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def setRememberDuration(duration: Time) { + private[streaming] def remember(duration: Time) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7a9a71f303..4a41f2f516 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -18,19 +18,39 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID -final class StreamingContext ( +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( sc_ : SparkContext, - cp_ : Checkpoint + cp_ : Checkpoint, + batchDur_ : Time ) extends Logging { - def this(sparkContext: SparkContext) = this(sparkContext, null) - - def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) = - this(new SparkContext(master, frameworkName, sparkHome, jars), null) + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) - def this(path: String) = this(null, CheckpointReader.read(path)) + /** + * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Time) = + this(new SparkContext(master, frameworkName), null, batchDuration) - def this(cp_ : Checkpoint) = this(null, cp_) + /** + * Recreates the StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) initLogging() @@ -57,7 +77,10 @@ final class StreamingContext ( cp_.graph.restoreCheckpointData() cp_.graph } else { - new DStreamGraph() + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph } } @@ -77,12 +100,8 @@ final class StreamingContext ( private[streaming] var receiverJobThread: Thread = null private[streaming] var scheduler: Scheduler = null - def setBatchDuration(duration: Time) { - graph.setBatchDuration(duration) - } - - def setRememberDuration(duration: Time) { - graph.setRememberDuration(duration) + def remember(duration: Time) { + graph.remember(duration) } def checkpoint(dir: String, interval: Time = null) { diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala deleted file mode 100644 index d2fdabd659..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel -import spark.streaming._ -import spark.streaming.StreamingContext._ - -object CountRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "CountRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) - - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala index d68611abd6..81938d30d4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala @@ -14,10 +14,9 @@ object FileStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "FileStream") - ssc.setBatchDuration(Seconds(2)) - + // Create the context + val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) + // Create the new directory val directory = new Path(args(1)) val fs = directory.getFileSystem(new Configuration()) diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala index 21a83c0fde..b7bc15a1d5 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -32,9 +32,8 @@ object FileStreamWithCheckpoint { if (!fs.exists(directory)) fs.mkdirs(directory) // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint") - ssc_.setBatchDuration(Seconds(1)) - ssc_.checkpoint(checkpointDir, Seconds(1)) + val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) + ssc_.checkpoint(checkpointDir) // Setup the streaming computation val inputStream = ssc_.textFileStream(directory.toString) diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index ffbea6e55d..6cb2b4c042 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -16,9 +16,10 @@ object GrepRaw { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - // Create the context and set the batch size - val ssc = new StreamingContext(master, "GrepRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) + // Create the context + val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala index 2af51bad28..2a265d021d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -1,9 +1,8 @@ package spark.streaming.examples import spark.RDD -import spark.streaming.StreamingContext +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.Seconds import scala.collection.mutable.SynchronizedQueue @@ -15,10 +14,9 @@ object QueueStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "QueueStream") - ssc.setBatchDuration(Seconds(1)) - + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + // Create the queue through which RDDs can be pushed to // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 0411bde1a7..fe4c2bf155 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -20,12 +20,11 @@ object TopKWordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args val k = 10 - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in /*warmUp(ssc.sc)*/ diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala index 591cb141c3..867a8f42c4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -10,9 +10,8 @@ object WordCountHdfs { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountHdfs") - ssc.setBatchDuration(Seconds(2)) + // Create the context + val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala index ba1bd1de7c..eadda60563 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._ object WordCountNetwork { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: WordCountNetwork <master> <hostname> <port>") + System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork") - ssc.setBatchDuration(Seconds(2)) + val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 571428c0fe..a29c81d437 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -19,12 +19,11 @@ object WordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 1a51fb66cd..68be6b7893 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -23,9 +23,8 @@ object PageViewStream { val host = args(1) val port = args(2).toInt - // Create the context and set the batch size - val ssc = new StreamingContext("local[2]", "PageViewStream") - ssc.setBatchDuration(Seconds(1)) + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index d0aaac0f2e..dc38ef4912 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase { } val ssc = setupStreams(input, operation _) - ssc.setRememberDuration(rememberDuration) + ssc.remember(rememberDuration) runStreams[(Int, Int)](ssc, input.size, input.size / 2) val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 3e99440226..e98c096725 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) @@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) @@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) @@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val filestream = ssc.textFileStream(testDir.toString) var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 5fb5cc504c..8cc2f8ccfc 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } @@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } |