diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-24 18:18:22 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-24 18:18:22 -0700 |
commit | 526a518bf32ad55b926a26f16086f445fd0ae29f (patch) | |
tree | dc4bcf8aa155aae8fa5e5bdeb40b47c423745b9d /streaming/src/test | |
parent | 35e3d199f04fba3230625002a458d43b9578b2e8 (diff) | |
download | spark-526a518bf32ad55b926a26f16086f445fd0ae29f.tar.gz spark-526a518bf32ad55b926a26f16086f445fd0ae29f.tar.bz2 spark-526a518bf32ad55b926a26f16086f445fd0ae29f.zip |
[SPARK-1592][streaming] Automatically remove streaming input blocks
The raw input data is stored as blocks in BlockManagers. Earlier they were cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be set, the block would not get cleared. This increases up the Spark's memory usage, which is not even accounted and shown in the Spark storage UI. It may cause the data blocks to spill over to disk, which eventually slows down the receiving of data (persisting to memory become bottlenecked by writing to disk).
The solution in this PR is to automatically remove those blocks. The mechanism to keep track of which BlockRDDs (which has presents the raw data blocks as a RDD) can be safely cleared already exists. Just use it to explicitly remove blocks from BlockRDDs.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #512 from tdas/block-rdd-unpersist and squashes the following commits:
d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist
5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist
2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist setting.
2d4b2fd [Tathagata Das] Automatically removed input blocks
Diffstat (limited to 'streaming/src/test')
3 files changed, 76 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 8aec27e394..4792ca1f8a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.SparkContext._ import util.ManualClock @@ -27,6 +27,8 @@ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag +import org.apache.spark.storage.StorageLevel +import scala.collection.mutable class BasicOperationsSuite extends TestSuiteBase { test("map") { @@ -450,6 +452,78 @@ class BasicOperationsSuite extends TestSuiteBase { assert(!stateStream.generatedRDDs.contains(Time(4000))) } + test("rdd cleanup - input blocks and persisted RDDs") { + // Actually receive data over through receiver to create BlockRDDs + + // Start the server + val testServer = new TestServer() + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) + val mappedStream = networkStream.map(_ + ".").persist() + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(mappedStream, outputBuffer) + + outputStream.register() + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5, 6) + + val blockRdds = new mutable.HashMap[Time, BlockRDD[_]] + val persistentRddIds = new mutable.HashMap[Time, Int] + + def collectRddInfo() { // get all RDD info required for verification + networkStream.generatedRDDs.foreach { case (time, rdd) => + blockRdds(time) = rdd.asInstanceOf[BlockRDD[_]] + } + mappedStream.generatedRDDs.foreach { case (time, rdd) => + persistentRddIds(time) = rdd.id + } + } + + Thread.sleep(200) + for (i <- 0 until input.size) { + testServer.send(input(i).toString + "\n") + Thread.sleep(200) + clock.addToTime(batchDuration.milliseconds) + collectRddInfo() + } + + Thread.sleep(200) + collectRddInfo() + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + + // verify data has been received + assert(outputBuffer.size > 0) + assert(blockRdds.size > 0) + assert(persistentRddIds.size > 0) + + import Time._ + + val latestPersistedRddId = persistentRddIds(persistentRddIds.keySet.max) + val earliestPersistedRddId = persistentRddIds(persistentRddIds.keySet.min) + val latestBlockRdd = blockRdds(blockRdds.keySet.max) + val earliestBlockRdd = blockRdds(blockRdds.keySet.min) + // verify that the latest mapped RDD is persisted but the earliest one has been unpersisted + assert(ssc.sparkContext.persistentRdds.contains(latestPersistedRddId)) + assert(!ssc.sparkContext.persistentRdds.contains(earliestPersistedRddId)) + + // verify that the latest input blocks are present but the earliest blocks have been removed + assert(latestBlockRdd.isValid) + assert(latestBlockRdd.collect != null) + assert(!earliestBlockRdd.isValid) + earliestBlockRdd.blockIds.foreach { blockId => + assert(!ssc.sparkContext.env.blockManager.master.contains(blockId)) + } + ssc.stop() + } + /** Test cleanup of RDDs in DStream metadata */ def runCleanupTest[T: ClassTag]( conf2: SparkConf, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 3bad871b5c..b55b7834c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -42,8 +42,6 @@ import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - val testPort = 9999 - test("socket input stream") { // Start the server val testServer = new TestServer() @@ -288,17 +286,6 @@ class TestServer(portToBind: Int = 0) extends Logging { def port = serverSocket.getLocalPort } -object TestServer { - def main(args: Array[String]) { - val s = new TestServer() - s.start() - while(true) { - Thread.sleep(1000) - s.send("hello") - } - } -} - /** This is an actor for testing actor input stream */ class TestActor(port: Int) extends Actor with ActorHelper { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index 45304c76b0..ff3619a590 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ +import scala.language.postfixOps /** Testsuite for testing the network receiver behavior */ class NetworkReceiverSuite extends FunSuite with Timeouts { |