aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-13 14:57:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-13 14:57:07 -0800
commit27311b13321ba60ee1324b86234f0aaf63df9f67 (patch)
treec2b813a32ac9e019ae67399ad58d120c8f03d813 /streaming/src
parentb93f9d42f21f03163734ef97b2871db945e166da (diff)
downloadspark-27311b13321ba60ee1324b86234f0aaf63df9f67.tar.gz
spark-27311b13321ba60ee1324b86234f0aaf63df9f67.tar.bz2
spark-27311b13321ba60ee1324b86234f0aaf63df9f67.zip
Added unpersisting and modified testsuite to better test out metadata cleaning.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala72
4 files changed, 61 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1ec4492bca..a493a8279f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}
-
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
@@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
}
/**
- * Enable periodic checkpointing of RDDs of this DStream
+ * Enable periodic checkpointing of RDDs of this DStream.
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration) = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7c4cca7ea..9e2b36f443 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -334,6 +334,10 @@ abstract class DStream[T: ClassTag] (
private[streaming] def clearMetadata(time: Time) {
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
generatedRDDs --= oldRDDs.keys
+ if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
+ logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
+ oldRDDs.values.foreach(_.unpersist(false))
+ }
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
@@ -751,7 +755,12 @@ abstract class DStream[T: ClassTag] (
this.foreachRDD(saveFunc)
}
- def register() {
+ /**
+ * Register this streaming as an output stream. This would ensure that RDDs of this
+ * DStream will be generated.
+ */
+ def register(): DStream[T] = {
ssc.registerOutputStream(this)
+ this
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 2da4127f47..38bad5ac80 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
}
case None =>
- logInfo("Nothing to delete")
+ logDebug("Nothing to delete")
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234..b73edf81d4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._
import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
@@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}
- test("forgetting of RDDs - map and window operations") {
- assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+ val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
- val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+ test("rdd cleanup - map and window") {
val rememberDuration = Seconds(3)
-
- assert(input.size === 10, "Number of inputs have changed")
-
def operation(s: DStream[Int]): DStream[(Int, Int)] = {
s.map(x => (x % 10, 1))
.window(Seconds(2), Seconds(1))
.window(Seconds(4), Seconds(2))
}
- val ssc = setupStreams(input, operation _)
- ssc.remember(rememberDuration)
- runStreams[(Int, Int)](ssc, input.size, input.size / 2)
-
- val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
- val windowedStream1 = windowedStream2.dependencies.head
+ val operatedStream = runCleanupTest(conf, operation _,
+ numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
+ val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+ val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
-
- // IDEALLY
- // WindowedStream2 should remember till 7 seconds: 10, 8,
- // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3,
+ // Checkpoint remember durations
+ assert(windowedStream2.rememberDuration === rememberDuration)
+ assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+ assert(mappedStream.rememberDuration ===
+ rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
- // IN THIS TEST
- // WindowedStream2 should remember till 7 seconds: 10, 8,
+ // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
+ // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
@@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}
+
+ test("rdd cleanup - updateStateByKey") {
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+ }
+ val stateStream = runCleanupTest(
+ conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
+
+ assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+ assert(stateStream.generatedRDDs.contains(Time(10000)))
+ assert(!stateStream.generatedRDDs.contains(Time(4000)))
+ }
+
+ /** Test cleanup of RDDs in DStream metadata */
+ def runCleanupTest[T: ClassTag](
+ conf2: SparkConf,
+ operation: DStream[Int] => DStream[T],
+ numExpectedOutput: Int = cleanupTestInput.size,
+ rememberDuration: Duration = null
+ ): DStream[T] = {
+
+ // Setup the stream computation
+ assert(batchDuration === Seconds(1),
+ "Batch duration has changed from 1 second, check cleanup tests")
+ val ssc = setupStreams(cleanupTestInput, operation)
+ val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
+ if (rememberDuration != null) ssc.remember(rememberDuration)
+ val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ assert(clock.time === Seconds(10).milliseconds)
+ assert(output.size === numExpectedOutput)
+ operatedStream
+ }
}