aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-12-05 10:16:56 -0800
committerDenny <dennybritz@gmail.com>2012-12-05 10:16:56 -0800
commit15df4b0e52c1a594ed07981e6f2ee1602de7ccbb (patch)
treebecf272ab93d07e48c2187939f9572b0e14fb93b /streaming
parent5e2b0a3bf60dead1ac7946c9984b067c926c2904 (diff)
parenta69a82be2682148f5d1ebbdede15a47c90eea73d (diff)
downloadspark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.tar.gz
spark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.tar.bz2
spark-15df4b0e52c1a594ed07981e6f2ee1602de7ccbb.zip
Merge branch 'dev' into kafka
Conflicts: streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala45
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala70
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/CountRaw.scala32
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala5
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala6
22 files changed, 139 insertions, 109 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index c76c73b35a..85106b3ad8 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
@@ -31,7 +51,7 @@ extends Serializable with Logging {
* ----------------------------------------------
*/
- // Time by which the window slides in this DStream
+ // Time interval at which the DStream generates an RDD
def slideTime: Time
// List of parent DStreams on which this DStream depends on
@@ -129,6 +149,8 @@ extends Serializable with Logging {
}
protected[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
assert(
!mustCheckpoint || checkpointInterval != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
@@ -163,13 +185,25 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
+ val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
+ assert(
+ metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
+ "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ )
+
dependencies.foreach(_.validate())
logInfo("Slide time = " + slideTime)
logInfo("Storage level = " + storageLevel)
logInfo("Checkpoint interval = " + checkpointInterval)
logInfo("Remember duration = " + rememberDuration)
- logInfo("Initialized " + this)
+ logInfo("Initialized and validated " + this)
}
protected[streaming] def setContext(s: StreamingContext) {
@@ -189,12 +223,12 @@ extends Serializable with Logging {
dependencies.foreach(_.setGraph(graph))
}
- protected[streaming] def setRememberDuration(duration: Time) {
+ protected[streaming] def remember(duration: Time) {
if (duration != null && duration > rememberDuration) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
}
- dependencies.foreach(_.setRememberDuration(parentRememberDuration))
+ dependencies.foreach(_.remember(parentRememberDuration))
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
@@ -331,7 +365,8 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data for time " + currentTime)
+ logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, "
+ + "[" + checkpointData.mkString(",") + "]")
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bd8c033eab..d0a9ade61d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
zeroTime = time
outputStreams.foreach(_.initialize(zeroTime))
- outputStreams.foreach(_.setRememberDuration(rememberDuration))
+ outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
inputStreams.par.foreach(_.start())
}
@@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def setRememberDuration(duration: Time) {
+ private[streaming] def remember(duration: Time) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 4c42692295..73ba877085 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -58,7 +58,7 @@ class NetworkInputTracker(
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
- logInfo("Registered receiver for network stream " + streamId)
+ logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index e09d27d34f..720e63bba0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
+import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
@@ -115,7 +116,10 @@ extends Serializable {
slideTime: Time,
partitioner: Partitioner
): DStream[(K, V)] = {
- self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ self.reduceByKey(cleanedReduceFunc, partitioner)
+ .window(windowTime, slideTime)
+ .reduceByKey(cleanedReduceFunc, partitioner)
}
// This method is the efficient sliding window reduce operation,
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 8b484e6acf..f63a9e0011 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
- throw new Exception("Neither previous window has value for key, nor new values found")
+ throw new Exception("Neither previous window has value for key, nor new values found. " +
+ "Are you sure your key class hashes consistently?")
}
// Reduce the new values
newValues.reduce(reduceF) // return
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index e2dca91179..014021be61 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -17,7 +17,7 @@ extends Logging {
val graph = ssc.graph
- val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
+ val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 5e11e6d734..8153dd4567 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,20 +17,41 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
-
-final class StreamingContext (
+import spark.util.MetadataCleaner
+
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class StreamingContext private (
sc_ : SparkContext,
- cp_ : Checkpoint
+ cp_ : Checkpoint,
+ batchDur_ : Time
) extends Logging {
- def this(sparkContext: SparkContext) = this(sparkContext, null)
-
- def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
- this(new SparkContext(master, frameworkName, sparkHome, jars), null)
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param sparkContext Existing SparkContext
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration)
- def this(path: String) = this(null, CheckpointReader.read(path))
+ /**
+ * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, frameworkName: String, batchDuration: Time) =
+ this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
- def this(cp_ : Checkpoint) = this(null, cp_)
+ /**
+ * Recreates the StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this(null, CheckpointReader.read(path), null)
initLogging()
@@ -57,7 +78,10 @@ final class StreamingContext (
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
- new DStreamGraph()
+ assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+ val newGraph = new DStreamGraph()
+ newGraph.setBatchDuration(batchDur_)
+ newGraph
}
}
@@ -77,12 +101,8 @@ final class StreamingContext (
private[streaming] var receiverJobThread: Thread = null
private[streaming] var scheduler: Scheduler = null
- def setBatchDuration(duration: Time) {
- graph.setBatchDuration(duration)
- }
-
- def setRememberDuration(duration: Time) {
- graph.setRememberDuration(duration)
+ def remember(duration: Time) {
+ graph.remember(duration)
}
def checkpoint(dir: String, interval: Time = null) {
@@ -195,6 +215,10 @@ final class StreamingContext (
inputStream
}
+ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ new UnionDStream[T](streams.toArray)
+ }
+
/**
* This function registers a InputDStream as an input stream that will be
* started (InputDStream.start() called) to get the input data streams.
@@ -220,11 +244,8 @@ final class StreamingContext (
"Checkpoint directory has been set, but the graph checkpointing interval has " +
"not been set. Please use StreamingContext.checkpoint() to set the interval."
)
-
-
}
-
/**
* This function starts the execution of the streams.
*/
@@ -271,6 +292,17 @@ final class StreamingContext (
object StreamingContext {
+
+ def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second interval.
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(60)
+ }
+ new SparkContext(master, frameworkName)
+ }
+
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index ce89a3f99b..e4d2a634f5 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming
import spark.RDD
import spark.rdd.UnionRDD
+import spark.storage.StorageLevel
class WindowedDStream[T: ClassManifest](
@@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+ parent.persist(StorageLevel.MEMORY_ONLY_SER)
+
def windowTime: Time = _windowTime
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
deleted file mode 100644
index d2fdabd659..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-import spark.streaming._
-import spark.streaming.StreamingContext._
-
-object CountRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "CountRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
-
- // Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
index d68611abd6..81938d30d4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
@@ -14,10 +14,9 @@ object FileStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "FileStream")
- ssc.setBatchDuration(Seconds(2))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
+
// Create the new directory
val directory = new Path(args(1))
val fs = directory.getFileSystem(new Configuration())
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
index 21a83c0fde..b7bc15a1d5 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
@@ -32,9 +32,8 @@ object FileStreamWithCheckpoint {
if (!fs.exists(directory)) fs.mkdirs(directory)
// Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint")
- ssc_.setBatchDuration(Seconds(1))
- ssc_.checkpoint(checkpointDir, Seconds(1))
+ val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
+ ssc_.checkpoint(checkpointDir)
// Setup the streaming computation
val inputStream = ssc_.textFileStream(directory.toString)
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index ffbea6e55d..6cb2b4c042 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -16,9 +16,10 @@ object GrepRaw {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "GrepRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
+ // Create the context
+ val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
index 2af51bad28..2a265d021d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -1,9 +1,8 @@
package spark.streaming.examples
import spark.RDD
-import spark.streaming.StreamingContext
+import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
import scala.collection.mutable.SynchronizedQueue
@@ -15,10 +14,9 @@ object QueueStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "QueueStream")
- ssc.setBatchDuration(Seconds(1))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
val rddQueue = new SynchronizedQueue[RDD[Int]]()
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 0411bde1a7..fe4c2bf155 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -20,12 +20,11 @@ object TopKWordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
val k = 10
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
/*warmUp(ssc.sc)*/
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
index 591cb141c3..867a8f42c4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
@@ -10,9 +10,8 @@ object WordCountHdfs {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountHdfs")
- ssc.setBatchDuration(Seconds(2))
+ // Create the context
+ val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
index ba1bd1de7c..eadda60563 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._
object WordCountNetwork {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
+ System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
// Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork")
- ssc.setBatchDuration(Seconds(2))
+ val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 571428c0fe..a29c81d437 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -19,12 +19,11 @@ object WordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 1a51fb66cd..68be6b7893 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -23,9 +23,8 @@ object PageViewStream {
val host = args(1)
val port = args(2).toInt
- // Create the context and set the batch size
- val ssc = new StreamingContext("local[2]", "PageViewStream")
- ssc.setBatchDuration(Seconds(1))
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 33774b463d..02fe16866e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
+log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index d0aaac0f2e..dc38ef4912 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}
val ssc = setupStreams(input, operation _)
- ssc.setRememberDuration(rememberDuration)
+ ssc.remember(rememberDuration)
runStreams[(Int, Int)](ssc, input.size, input.size / 2)
val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 3e99440226..e98c096725 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
@@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val filestream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val filestream = ssc.textFileStream(testDir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 5fb5cc504c..8cc2f8ccfc 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}
@@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}