aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-19 19:04:39 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-19 19:04:39 -0800
commitfd11d23bb3a817dabd414bceddebc35ad731f626 (patch)
tree706feed2dfe03e8ed54ec374f69af44e8d06ddd6
parentc97ebf64377e853ab7c616a103869a4417f25954 (diff)
downloadspark-fd11d23bb3a817dabd414bceddebc35ad731f626.tar.gz
spark-fd11d23bb3a817dabd414bceddebc35ad731f626.tar.bz2
spark-fd11d23bb3a817dabd414bceddebc35ad731f626.zip
Modified StreamingContext API to make constructor accept the batch size (since it is always needed, Patrick's suggestion). Added description to DStream and StreamingContext.
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/CountRaw.scala32
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala5
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala6
16 files changed, 92 insertions, 98 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 26d5ce9198..8efda2074d 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
extends Serializable with Logging {
@@ -28,7 +48,7 @@ extends Serializable with Logging {
* ----------------------------------------------
*/
- // Time by which the window slides in this DStream
+ // Time interval at which the DStream generates an RDD
def slideTime: Time
// List of parent DStreams on which this DStream depends on
@@ -186,12 +206,12 @@ extends Serializable with Logging {
dependencies.foreach(_.setGraph(graph))
}
- protected[streaming] def setRememberDuration(duration: Time) {
+ protected[streaming] def remember(duration: Time) {
if (duration != null && duration > rememberDuration) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
}
- dependencies.foreach(_.setRememberDuration(parentRememberDuration))
+ dependencies.foreach(_.remember(parentRememberDuration))
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bd8c033eab..d0a9ade61d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
zeroTime = time
outputStreams.foreach(_.initialize(zeroTime))
- outputStreams.foreach(_.setRememberDuration(rememberDuration))
+ outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
inputStreams.par.foreach(_.start())
}
@@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def setRememberDuration(duration: Time) {
+ private[streaming] def remember(duration: Time) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 7a9a71f303..4a41f2f516 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -18,19 +18,39 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
-final class StreamingContext (
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class StreamingContext private (
sc_ : SparkContext,
- cp_ : Checkpoint
+ cp_ : Checkpoint,
+ batchDur_ : Time
) extends Logging {
- def this(sparkContext: SparkContext) = this(sparkContext, null)
-
- def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
- this(new SparkContext(master, frameworkName, sparkHome, jars), null)
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param sparkContext Existing SparkContext
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration)
- def this(path: String) = this(null, CheckpointReader.read(path))
+ /**
+ * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, frameworkName: String, batchDuration: Time) =
+ this(new SparkContext(master, frameworkName), null, batchDuration)
- def this(cp_ : Checkpoint) = this(null, cp_)
+ /**
+ * Recreates the StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this(null, CheckpointReader.read(path), null)
initLogging()
@@ -57,7 +77,10 @@ final class StreamingContext (
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
- new DStreamGraph()
+ assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+ val newGraph = new DStreamGraph()
+ newGraph.setBatchDuration(batchDur_)
+ newGraph
}
}
@@ -77,12 +100,8 @@ final class StreamingContext (
private[streaming] var receiverJobThread: Thread = null
private[streaming] var scheduler: Scheduler = null
- def setBatchDuration(duration: Time) {
- graph.setBatchDuration(duration)
- }
-
- def setRememberDuration(duration: Time) {
- graph.setRememberDuration(duration)
+ def remember(duration: Time) {
+ graph.remember(duration)
}
def checkpoint(dir: String, interval: Time = null) {
diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
deleted file mode 100644
index d2fdabd659..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-import spark.streaming._
-import spark.streaming.StreamingContext._
-
-object CountRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "CountRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
-
- // Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
index d68611abd6..81938d30d4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
@@ -14,10 +14,9 @@ object FileStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "FileStream")
- ssc.setBatchDuration(Seconds(2))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
+
// Create the new directory
val directory = new Path(args(1))
val fs = directory.getFileSystem(new Configuration())
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
index 21a83c0fde..b7bc15a1d5 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
@@ -32,9 +32,8 @@ object FileStreamWithCheckpoint {
if (!fs.exists(directory)) fs.mkdirs(directory)
// Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint")
- ssc_.setBatchDuration(Seconds(1))
- ssc_.checkpoint(checkpointDir, Seconds(1))
+ val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
+ ssc_.checkpoint(checkpointDir)
// Setup the streaming computation
val inputStream = ssc_.textFileStream(directory.toString)
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index ffbea6e55d..6cb2b4c042 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -16,9 +16,10 @@ object GrepRaw {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "GrepRaw")
- ssc.setBatchDuration(Milliseconds(batchMillis))
+ // Create the context
+ val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
index 2af51bad28..2a265d021d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -1,9 +1,8 @@
package spark.streaming.examples
import spark.RDD
-import spark.streaming.StreamingContext
+import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
import scala.collection.mutable.SynchronizedQueue
@@ -15,10 +14,9 @@ object QueueStream {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "QueueStream")
- ssc.setBatchDuration(Seconds(1))
-
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
val rddQueue = new SynchronizedQueue[RDD[Int]]()
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 0411bde1a7..fe4c2bf155 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -20,12 +20,11 @@ object TopKWordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
val k = 10
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
/*warmUp(ssc.sc)*/
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
index 591cb141c3..867a8f42c4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
@@ -10,9 +10,8 @@ object WordCountHdfs {
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountHdfs")
- ssc.setBatchDuration(Seconds(2))
+ // Create the context
+ val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
index ba1bd1de7c..eadda60563 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._
object WordCountNetwork {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
+ System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
// Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork")
- ssc.setBatchDuration(Seconds(2))
+ val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 571428c0fe..a29c81d437 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -19,12 +19,11 @@ object WordCountRaw {
val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- // Create the context, set the batch size and checkpoint directory.
+ // Create the context, and set the checkpoint directory.
// Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
// periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw")
- ssc.setBatchDuration(Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+ val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
// Warm up the JVMs on master and slave for JIT compilation to kick in
warmUp(ssc.sc)
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 1a51fb66cd..68be6b7893 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -23,9 +23,8 @@ object PageViewStream {
val host = args(1)
val port = args(2).toInt
- // Create the context and set the batch size
- val ssc = new StreamingContext("local[2]", "PageViewStream")
- ssc.setBatchDuration(Seconds(1))
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index d0aaac0f2e..dc38ef4912 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}
val ssc = setupStreams(input, operation _)
- ssc.setRememberDuration(rememberDuration)
+ ssc.remember(rememberDuration)
runStreams[(Int, Int)](ssc, input.size, input.size / 2)
val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 3e99440226..e98c096725 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
@@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
val filestream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val filestream = ssc.textFileStream(testDir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 5fb5cc504c..8cc2f8ccfc 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}
@@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging {
): StreamingContext = {
// Create StreamingContext
- val ssc = new StreamingContext(master, framework)
- ssc.setBatchDuration(batchDuration)
+ val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir, checkpointInterval)
}