aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-13 22:29:03 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-13 22:29:03 -0800
commit08b9fec93d00ff0ebb49af4d9ac72d2806eded02 (patch)
tree8dc8913638f6112e60c6c4166ec968946ea54e8e /streaming
parentb07bc02a00881822d2cb350f20dca31ddbeed54b (diff)
parent27311b13321ba60ee1324b86234f0aaf63df9f67 (diff)
downloadspark-08b9fec93d00ff0ebb49af4d9ac72d2806eded02.tar.gz
spark-08b9fec93d00ff0ebb49af4d9ac72d2806eded02.tar.bz2
spark-08b9fec93d00ff0ebb49af4d9ac72d2806eded02.zip
Merge pull request #409 from tdas/unpersist
Automatically unpersisting RDDs that have been cleaned up from DStreams Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads. This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default. Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala72
4 files changed, 61 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1ec4492bca..a493a8279f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}
-
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
@@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
}
/**
- * Enable periodic checkpointing of RDDs of this DStream
+ * Enable periodic checkpointing of RDDs of this DStream.
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration) = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 9dfcc08abe..426f61e24a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -340,6 +340,10 @@ abstract class DStream[T: ClassTag] (
private[streaming] def clearMetadata(time: Time) {
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
generatedRDDs --= oldRDDs.keys
+ if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
+ logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
+ oldRDDs.values.foreach(_.unpersist(false))
+ }
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
@@ -760,7 +764,12 @@ abstract class DStream[T: ClassTag] (
this.foreachRDD(saveFunc)
}
- def register() {
+ /**
+ * Register this streaming as an output stream. This would ensure that RDDs of this
+ * DStream will be generated.
+ */
+ def register(): DStream[T] = {
ssc.registerOutputStream(this)
+ this
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 2da4127f47..38bad5ac80 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
}
case None =>
- logInfo("Nothing to delete")
+ logDebug("Nothing to delete")
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234..b73edf81d4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._
import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
@@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}
- test("forgetting of RDDs - map and window operations") {
- assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+ val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
- val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+ test("rdd cleanup - map and window") {
val rememberDuration = Seconds(3)
-
- assert(input.size === 10, "Number of inputs have changed")
-
def operation(s: DStream[Int]): DStream[(Int, Int)] = {
s.map(x => (x % 10, 1))
.window(Seconds(2), Seconds(1))
.window(Seconds(4), Seconds(2))
}
- val ssc = setupStreams(input, operation _)
- ssc.remember(rememberDuration)
- runStreams[(Int, Int)](ssc, input.size, input.size / 2)
-
- val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
- val windowedStream1 = windowedStream2.dependencies.head
+ val operatedStream = runCleanupTest(conf, operation _,
+ numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
+ val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+ val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
-
- // IDEALLY
- // WindowedStream2 should remember till 7 seconds: 10, 8,
- // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3,
+ // Checkpoint remember durations
+ assert(windowedStream2.rememberDuration === rememberDuration)
+ assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+ assert(mappedStream.rememberDuration ===
+ rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
- // IN THIS TEST
- // WindowedStream2 should remember till 7 seconds: 10, 8,
+ // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
+ // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
@@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}
+
+ test("rdd cleanup - updateStateByKey") {
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+ }
+ val stateStream = runCleanupTest(
+ conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
+
+ assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+ assert(stateStream.generatedRDDs.contains(Time(10000)))
+ assert(!stateStream.generatedRDDs.contains(Time(4000)))
+ }
+
+ /** Test cleanup of RDDs in DStream metadata */
+ def runCleanupTest[T: ClassTag](
+ conf2: SparkConf,
+ operation: DStream[Int] => DStream[T],
+ numExpectedOutput: Int = cleanupTestInput.size,
+ rememberDuration: Duration = null
+ ): DStream[T] = {
+
+ // Setup the stream computation
+ assert(batchDuration === Seconds(1),
+ "Batch duration has changed from 1 second, check cleanup tests")
+ val ssc = setupStreams(cleanupTestInput, operation)
+ val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
+ if (rememberDuration != null) ssc.remember(rememberDuration)
+ val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ assert(clock.time === Seconds(10).milliseconds)
+ assert(output.size === numExpectedOutput)
+ operatedStream
+ }
}