aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-19 12:11:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-19 12:11:44 -0700
commit6d5eb4b40ccad150c967fee8557a4e5d5664b4bd (patch)
treeee58380633c8064677917b70a44a3da07b47e1fc /streaming
parentb760d6426a7fa2a6d115cefc786aa766b9419bd6 (diff)
downloadspark-6d5eb4b40ccad150c967fee8557a4e5d5664b4bd.tar.gz
spark-6d5eb4b40ccad150c967fee8557a4e5d5664b4bd.tar.bz2
spark-6d5eb4b40ccad150c967fee8557a4e5d5664b4bd.zip
Added functionality to forget RDDs from DStreams.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala90
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala14
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala9
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala87
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala7
11 files changed, 224 insertions, 79 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index f7936bdc5f..23fd0f2434 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -14,7 +14,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) ext
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
val graph = ssc.graph
- val batchDuration = ssc.batchDuration
val checkpointFile = ssc.checkpointFile
val checkpointInterval = ssc.checkpointInterval
@@ -24,7 +23,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) ext
assert(master != null, "Checkpoint.master is null")
assert(framework != null, "Checkpoint.framework is null")
assert(graph != null, "Checkpoint.graph is null")
- assert(batchDuration != null, "Checkpoint.batchDuration is null")
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0a43a042d0..645636b603 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -39,22 +39,30 @@ extends Serializable with Logging {
* ---------------------------------------
*/
- // Variable to store the RDDs generated earlier in time
- protected val generatedRDDs = new HashMap[Time, RDD[T]] ()
+ // RDDs generated, marked as protected[streaming] so that testsuites can access it
+ protected[streaming] val generatedRDDs = new HashMap[Time, RDD[T]] ()
- // Variable to be set to the first time seen by the DStream (effective time zero)
+ // Time zero for the DStream
protected var zeroTime: Time = null
- // Variable to specify storage level
+ // Time after which RDDs will be forgotten
+ protected var forgetTime: Time = null
+
+ // Storage level of the RDDs in the stream
protected var storageLevel: StorageLevel = StorageLevel.NONE
// Checkpoint level and checkpoint interval
protected var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
protected var checkpointInterval: Time = null
- // Reference to whole DStream graph, so that checkpointing process can lock it
+ // Reference to whole DStream graph
protected var graph: DStreamGraph = null
+ def isInitialized = (zeroTime != null)
+
+ // Time gap for forgetting old RDDs (i.e. removing them from generatedRDDs)
+ def parentForgetTime = forgetTime
+
// Change this RDD's storage level
def persist(
storageLevel: StorageLevel,
@@ -79,8 +87,6 @@ extends Serializable with Logging {
// Turn on the default caching level for this RDD
def cache(): DStream[T] = persist()
- def isInitialized() = (zeroTime != null)
-
/**
* This method initializes the DStream by setting the "zero" time, based on which
* the validity of future times is calculated. This method also recursively initializes
@@ -91,31 +97,43 @@ extends Serializable with Logging {
throw new Exception("ZeroTime is already initialized, cannot initialize it again")
}
zeroTime = time
- logInfo(this + " initialized")
dependencies.foreach(_.initialize(zeroTime))
+ logInfo("Initialized " + this)
}
protected[streaming] def setContext(s: StreamingContext) {
if (ssc != null && ssc != s) {
- throw new Exception("Context is already set, cannot set it again")
+ throw new Exception("Context is already set in " + this + ", cannot set it again")
}
ssc = s
- logInfo("Set context for " + this.getClass.getSimpleName)
+ logInfo("Set context for " + this)
dependencies.foreach(_.setContext(ssc))
}
protected[streaming] def setGraph(g: DStreamGraph) {
if (graph != null && graph != g) {
- throw new Exception("Graph is already set, cannot set it again")
+ throw new Exception("Graph is already set in " + this + ", cannot set it again")
}
graph = g
dependencies.foreach(_.setGraph(graph))
}
+ protected[streaming] def setForgetTime(time: Time = slideTime) {
+ if (time == null) {
+ throw new Exception("Time gap for forgetting RDDs cannot be set to null for " + this)
+ } else if (forgetTime != null && time < forgetTime) {
+ throw new Exception("Time gap for forgetting RDDs cannot be reduced from " + forgetTime
+ + " to " + time + " for " + this)
+ }
+ forgetTime = time
+ dependencies.foreach(_.setForgetTime(parentForgetTime))
+ logInfo("Time gap for forgetting RDDs set to " + forgetTime + " for " + this)
+ }
+
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
- throw new Exception (this.toString + " has not been initialized")
+ throw new Exception (this + " has not been initialized")
} else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
false
} else {
@@ -178,6 +196,21 @@ extends Serializable with Logging {
}
}
+ def forgetOldRDDs(time: Time) {
+ val keys = generatedRDDs.keys
+ var numForgotten = 0
+
+ keys.foreach(t => {
+ if (t < (time - forgetTime)) {
+ generatedRDDs.remove(t)
+ numForgotten += 1
+ //logInfo("Forgot RDD of time " + t + " from " + this)
+ }
+ })
+ logInfo("Forgot " + numForgotten + " RDDs from " + this)
+ dependencies.foreach(_.forgetOldRDDs(time))
+ }
+
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
logDebug(this.getClass().getSimpleName + ".writeObject used")
@@ -257,7 +290,7 @@ extends Serializable with Logging {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toBlockingQueue = {
+ def toBlockingQueue() = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
queue.add(rdd)
@@ -265,7 +298,7 @@ extends Serializable with Logging {
queue
}
- def print() = {
+ def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
println ("-------------------------------------------")
@@ -277,33 +310,38 @@ extends Serializable with Logging {
}
val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
- newStream
}
- def window(windowTime: Time, slideTime: Time) = new WindowedDStream(this, windowTime, slideTime)
+ def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+
+ def window(windowTime: Time, slideTime: Time): DStream[T] = {
+ new WindowedDStream(this, windowTime, slideTime)
+ }
- def batch(batchTime: Time) = window(batchTime, batchTime)
+ def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
- def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time) =
+ def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
+ }
def reduceByWindow(
- reduceFunc: (T, T) => T,
- invReduceFunc: (T, T) => T,
- windowTime: Time,
- slideTime: Time) = {
+ reduceFunc: (T, T) => T,
+ invReduceFunc: (T, T) => T,
+ windowTime: Time,
+ slideTime: Time
+ ): DStream[T] = {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
.map(_._2)
}
- def countByWindow(windowTime: Time, slideTime: Time) = {
+ def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
def add(v1: Int, v2: Int) = (v1 + v2)
def subtract(v1: Int, v2: Int) = (v1 - v2)
this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime)
}
- def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
+ def union(that: DStream[T]): DStream[T] = new UnifiedDStream[T](Array(this, that))
def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
@@ -336,8 +374,8 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
override def slideTime = {
if (ssc == null) throw new Exception("ssc is null")
- if (ssc.batchDuration == null) throw new Exception("ssc.batchDuration is null")
- ssc.batchDuration
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
}
def start()
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bcd365e932..964c8a26a0 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -11,7 +11,8 @@ final class DStreamGraph extends Serializable with Logging {
private val outputStreams = new ArrayBuffer[DStream[_]]()
private[streaming] var zeroTime: Time = null
- private[streaming] var checkpointInProgress = false;
+ private[streaming] var batchDuration: Time = null
+ private[streaming] var checkpointInProgress = false
def start(time: Time) {
this.synchronized {
@@ -20,6 +21,7 @@ final class DStreamGraph extends Serializable with Logging {
}
zeroTime = time
outputStreams.foreach(_.initialize(zeroTime))
+ outputStreams.foreach(_.setForgetTime())
inputStreams.par.foreach(_.start())
}
}
@@ -36,14 +38,28 @@ final class DStreamGraph extends Serializable with Logging {
}
}
+ def setBatchDuration(duration: Time) {
+ this.synchronized {
+ if (batchDuration != null) {
+ throw new Exception("Batch duration already set as " + batchDuration +
+ ". cannot set it again.")
+ }
+ }
+ batchDuration = duration
+ }
+
def addInputStream(inputStream: InputDStream[_]) {
- inputStream.setGraph(this)
- inputStreams += inputStream
+ this.synchronized {
+ inputStream.setGraph(this)
+ inputStreams += inputStream
+ }
}
def addOutputStream(outputStream: DStream[_]) {
- outputStream.setGraph(this)
- outputStreams += outputStream
+ this.synchronized {
+ outputStream.setGraph(this)
+ outputStreams += outputStream
+ }
}
def getInputStreams() = inputStreams.toArray
@@ -56,6 +72,20 @@ final class DStreamGraph extends Serializable with Logging {
}
}
+ def forgetOldRDDs(time: Time) {
+ this.synchronized {
+ outputStreams.foreach(_.forgetOldRDDs(time))
+ }
+ }
+
+ def validate() {
+ this.synchronized {
+ assert(batchDuration != null, "Batch duration has not been set")
+ assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
+ }
+ }
+
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
this.synchronized {
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3fd0a16bf0..0bd0321928 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -76,6 +76,13 @@ extends Serializable {
}
def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowTime, stream.slideTime, defaultPartitioner())
+ }
+
+ def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowTime: Time,
slideTime: Time
@@ -106,7 +113,7 @@ extends Serializable {
// so that new elements introduced in the window can be "added" using
// reduceFunc to the previous window's result and old elements can be
// "subtracted using invReduceFunc.
-
+
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index e161b5ba92..f3e95c9e2b 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -31,12 +31,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
@transient val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
- override def dependencies = List(reducedStream)
-
def windowTime: Time = _windowTime
+ override def dependencies = List(reducedStream)
+
override def slideTime: Time = _slideTime
+ //TODO: This is wrong. This should depend on the checkpointInterval
+ override def parentForgetTime: Time = forgetTime + windowTime
+
override def persist(
storageLevel: StorageLevel,
checkpointLevel: StorageLevel,
@@ -46,6 +49,13 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
this
}
+ protected[streaming] override def setForgetTime(time: Time) {
+ if (forgetTime == null || forgetTime < time) {
+ forgetTime = time
+ dependencies.foreach(_.setForgetTime(forgetTime + windowTime))
+ }
+ }
+
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 1e1425a88a..99e30b6110 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -20,7 +20,7 @@ extends Logging {
val jobManager = new JobManager(ssc, concurrentJobs)
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
@@ -53,11 +53,12 @@ extends Logging {
def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- logInfo("Generating RDDs for time " + time)
graph.generateRDDs(time).foreach(submitJob)
logInfo("Generated RDDs for time " + time)
+ graph.forgetOldRDDs(time)
if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
ssc.doCheckpoint(time)
+ logInfo("Checkpointed at time " + time)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 62d21b83d9..b5f4571798 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -6,16 +6,11 @@ import spark.SparkEnv
import spark.SparkContext
import spark.storage.StorageLevel
-import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue
import java.io.InputStream
-import java.io.IOException
-import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicInteger
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -65,20 +60,15 @@ class StreamingContext (
}
val nextNetworkInputStreamId = new AtomicInteger(0)
-
- var batchDuration: Time = if (isCheckpointPresent) cp_.batchDuration else null
- var checkpointFile: String = if (isCheckpointPresent) cp_.checkpointFile else null
- var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
var networkInputTracker: NetworkInputTracker = null
- var receiverJobThread: Thread = null
- var scheduler: Scheduler = null
+
+ private[streaming] var checkpointFile: String = if (isCheckpointPresent) cp_.checkpointFile else null
+ private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ private[streaming] var receiverJobThread: Thread = null
+ private[streaming] var scheduler: Scheduler = null
def setBatchDuration(duration: Time) {
- if (batchDuration != null) {
- throw new Exception("Batch duration alread set as " + batchDuration +
- ". cannot set it again.")
- }
- batchDuration = duration
+ graph.setBatchDuration(duration)
}
def setCheckpointDetails(file: String, interval: Time) {
@@ -183,21 +173,17 @@ class StreamingContext (
graph.addOutputStream(outputStream)
}
- /**
- * This function validate whether the stream computation is eligible to be executed.
- */
- private def validate() {
- assert(batchDuration != null, "Batch duration has not been set")
- assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ def validate() {
assert(graph != null, "Graph is null")
- assert(graph.getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
+ graph.validate()
}
-
+
/**
* This function starts the execution of the streams.
*/
def start() {
validate()
+
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
case _ => false
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 93c1291691..2984f88284 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -18,12 +18,14 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- override def dependencies = List(parent)
-
def windowTime: Time = _windowTime
+ override def dependencies = List(parent)
+
override def slideTime: Time = _slideTime
+ override def parentForgetTime: Time = forgetTime + windowTime
+
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 11cecf9822..061b331a16 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,6 +1,7 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import java.io.File
class CheckpointSuite extends DStreamSuiteBase {
@@ -14,17 +15,16 @@ class CheckpointSuite extends DStreamSuiteBase {
expectedOutput: Seq[Seq[V]],
useSet: Boolean = false
) {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
// Current code assumes that:
// number of inputs = number of outputs = number of batches to be run
- // Do half the computation (half the number of batches), create checkpoint file and quit
val totalNumBatches = input.size
val initialNumBatches = input.size / 2
val nextNumBatches = totalNumBatches - initialNumBatches
val initialNumExpectedOutputs = initialNumBatches
+ // Do half the computation (half the number of batches), create checkpoint file and quit
val ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), useSet)
@@ -35,6 +35,11 @@ class CheckpointSuite extends DStreamSuiteBase {
sscNew.setCheckpointDetails(null, null)
val outputNew = runStreams[V](sscNew, nextNumBatches, expectedOutput.size)
verifyOutput[V](outputNew, expectedOutput, useSet)
+
+ new File(checkpointFile).delete()
+ new File(checkpointFile + ".bk").delete()
+ new File("." + checkpointFile + ".crc").delete()
+ new File("." + checkpointFile + ".bk.crc").delete()
}
test("simple per-batch operation") {
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
index f8ca7febe7..5dd8b675b1 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -2,16 +2,21 @@ package spark.streaming
import spark.streaming.StreamingContext._
import scala.runtime.RichInt
+import util.ManualClock
class DStreamBasicSuite extends DStreamSuiteBase {
- test("map-like operations") {
+ test("map") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.map(_.toString),
+ input.map(_.map(_.toString))
+ )
+ }
+
+ test("flatmap") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
-
- // map
- testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString)))
-
- // flatMap
testOperation(
input,
(r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
@@ -19,16 +24,16 @@ class DStreamBasicSuite extends DStreamSuiteBase {
)
}
- test("shuffle-based operations") {
- // reduceByKey
+ test("reduceByKey") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
true
)
+ }
- // reduce
+ test("reduce") {
testOperation(
Seq(1 to 4, 5 to 8, 9 to 12),
(s: DStream[Int]) => s.reduce(_ + _),
@@ -57,7 +62,7 @@ class DStreamBasicSuite extends DStreamSuiteBase {
Seq(("a", 5), ("b", 3), ("c", 1))
)
- val updateStateOp = (s: DStream[String]) => {
+ val updateStateOperation = (s: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: RichInt) => {
var newState = 0
if (values != null && values.size > 0) newState += values.reduce(_ + _)
@@ -67,6 +72,66 @@ class DStreamBasicSuite extends DStreamSuiteBase {
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
}
- testOperation(inputData, updateStateOp, outputData, true)
+ testOperation(inputData, updateStateOperation, outputData, true)
+ }
+
+ test("forgetting of RDDs") {
+ assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+
+ val input = Seq(1 to 4, 5 to 8, 9 to 12, 13 to 16, 17 to 20, 21 to 24, 25 to 28, 29 to 32)
+
+ assert(input.size % 4 === 0, "Number of inputs should be a multiple of 4")
+
+ def operation(s: DStream[Int]): DStream[(Int, Int)] = {
+ s.map(x => (x % 10, 1))
+ .window(Seconds(2), Seconds(1))
+ .reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(1))
+ }
+
+ val ssc = setupStreams(input, operation _)
+ runStreams[(Int, Int)](ssc, input.size, input.size)
+
+ val reducedWindowedStream = ssc.graph.getOutputStreams().head.dependencies.head
+ .asInstanceOf[ReducedWindowedDStream[Int, Int]]
+ val windowedStream = reducedWindowedStream.dependencies.head.dependencies.head
+ .asInstanceOf[WindowedDStream[(Int, Int)]]
+ val mappedStream = windowedStream.dependencies.head
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val finalTime = Seconds(7)
+ //assert(clock.time === finalTime.milliseconds)
+
+ // ReducedWindowedStream should remember the last RDD created
+ assert(reducedWindowedStream.generatedRDDs.contains(finalTime))
+
+ // ReducedWindowedStream should have forgotten the previous to last RDD created
+ assert(!reducedWindowedStream.generatedRDDs.contains(finalTime - reducedWindowedStream.slideTime))
+
+ // WindowedStream should remember the last RDD created
+ assert(windowedStream.generatedRDDs.contains(finalTime))
+
+ // WindowedStream should still remember the previous to last RDD created
+ // as the last RDD of ReducedWindowedStream requires that RDD
+ assert(windowedStream.generatedRDDs.contains(finalTime - windowedStream.slideTime))
+
+ // WindowedStream should have forgotten this RDD as the last RDD of
+ // ReducedWindowedStream DOES NOT require this RDD
+ assert(!windowedStream.generatedRDDs.contains(finalTime - windowedStream.slideTime - reducedWindowedStream.windowTime))
+
+ // MappedStream should remember the last RDD created
+ assert(mappedStream.generatedRDDs.contains(finalTime))
+
+ // MappedStream should still remember the previous to last RDD created
+ // as the last RDD of WindowedStream requires that RDD
+ assert(mappedStream.generatedRDDs.contains(finalTime - mappedStream.slideTime))
+
+ // MappedStream should still remember this RDD as the last RDD of
+ // ReducedWindowedStream requires that RDD (even though the last RDD of
+ // WindowedStream does not need it)
+ assert(mappedStream.generatedRDDs.contains(finalTime - windowedStream.windowTime))
+
+ // MappedStream should have forgotten this RDD as the last RDD of
+ // ReducedWindowedStream DOES NOT require this RDD
+ assert(!mappedStream.generatedRDDs.contains(finalTime - mappedStream.slideTime - windowedStream.windowTime - reducedWindowedStream.windowTime))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
index 91ffc0c098..6e5a7a58bb 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -35,6 +35,8 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
trait DStreamSuiteBase extends FunSuite with Logging {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
def framework() = "DStreamSuiteBase"
def master() = "local[2]"
@@ -73,6 +75,9 @@ trait DStreamSuiteBase extends FunSuite with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
+
+ assert(numBatches > 0, "Number of batches to run stream computation is zero")
+ assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
// Get the output buffer
@@ -150,8 +155,6 @@ trait DStreamSuiteBase extends FunSuite with Logging {
numBatches: Int,
useSet: Boolean
) {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
val ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, numBatches_, expectedOutput.size)