aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala76
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala1
5 files changed, 90 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 6a6b00a778..37b3b28fa0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -68,5 +68,5 @@ case class Time(private val millis: Long) {
}
object Time {
- val ordering = Ordering.by((time: Time) => time.millis)
+ implicit val ordering = Ordering.by((time: Time) => time.millis)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d393cc03cb..f69f69e0c4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming._
@@ -340,13 +340,23 @@ abstract class DStream[T: ClassTag] (
* this to clear their own metadata along with the generated RDDs.
*/
private[streaming] def clearMetadata(time: Time) {
+ val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
- if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
+ if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
- oldRDDs.values.foreach(_.unpersist(false))
+ oldRDDs.values.foreach { rdd =>
+ rdd.unpersist(false)
+ // Explicitly remove blocks of BlockRDD
+ rdd match {
+ case b: BlockRDD[_] =>
+ logInfo("Removing blocks of RDD " + b + " of time " + time)
+ b.removeBlocks()
+ case _ =>
+ }
+ }
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 8aec27e394..4792ca1f8a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.SparkContext._
import util.ManualClock
@@ -27,6 +27,8 @@ import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
+import org.apache.spark.storage.StorageLevel
+import scala.collection.mutable
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
@@ -450,6 +452,78 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(!stateStream.generatedRDDs.contains(Time(4000)))
}
+ test("rdd cleanup - input blocks and persisted RDDs") {
+ // Actually receive data over through receiver to create BlockRDDs
+
+ // Start the server
+ val testServer = new TestServer()
+ testServer.start()
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+ val mappedStream = networkStream.map(_ + ".").persist()
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(mappedStream, outputBuffer)
+
+ outputStream.register()
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5, 6)
+
+ val blockRdds = new mutable.HashMap[Time, BlockRDD[_]]
+ val persistentRddIds = new mutable.HashMap[Time, Int]
+
+ def collectRddInfo() { // get all RDD info required for verification
+ networkStream.generatedRDDs.foreach { case (time, rdd) =>
+ blockRdds(time) = rdd.asInstanceOf[BlockRDD[_]]
+ }
+ mappedStream.generatedRDDs.foreach { case (time, rdd) =>
+ persistentRddIds(time) = rdd.id
+ }
+ }
+
+ Thread.sleep(200)
+ for (i <- 0 until input.size) {
+ testServer.send(input(i).toString + "\n")
+ Thread.sleep(200)
+ clock.addToTime(batchDuration.milliseconds)
+ collectRddInfo()
+ }
+
+ Thread.sleep(200)
+ collectRddInfo()
+ logInfo("Stopping server")
+ testServer.stop()
+ logInfo("Stopping context")
+
+ // verify data has been received
+ assert(outputBuffer.size > 0)
+ assert(blockRdds.size > 0)
+ assert(persistentRddIds.size > 0)
+
+ import Time._
+
+ val latestPersistedRddId = persistentRddIds(persistentRddIds.keySet.max)
+ val earliestPersistedRddId = persistentRddIds(persistentRddIds.keySet.min)
+ val latestBlockRdd = blockRdds(blockRdds.keySet.max)
+ val earliestBlockRdd = blockRdds(blockRdds.keySet.min)
+ // verify that the latest mapped RDD is persisted but the earliest one has been unpersisted
+ assert(ssc.sparkContext.persistentRdds.contains(latestPersistedRddId))
+ assert(!ssc.sparkContext.persistentRdds.contains(earliestPersistedRddId))
+
+ // verify that the latest input blocks are present but the earliest blocks have been removed
+ assert(latestBlockRdd.isValid)
+ assert(latestBlockRdd.collect != null)
+ assert(!earliestBlockRdd.isValid)
+ earliestBlockRdd.blockIds.foreach { blockId =>
+ assert(!ssc.sparkContext.env.blockManager.master.contains(blockId))
+ }
+ ssc.stop()
+ }
+
/** Test cleanup of RDDs in DStream metadata */
def runCleanupTest[T: ClassTag](
conf2: SparkConf,
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 3bad871b5c..b55b7834c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -42,8 +42,6 @@ import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
- val testPort = 9999
-
test("socket input stream") {
// Start the server
val testServer = new TestServer()
@@ -288,17 +286,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
def port = serverSocket.getLocalPort
}
-object TestServer {
- def main(args: Array[String]) {
- val s = new TestServer()
- s.start()
- while(true) {
- Thread.sleep(1000)
- s.send("hello")
- }
- }
-}
-
/** This is an actor for testing actor input stream */
class TestActor(port: Int) extends Actor with ActorHelper {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 45304c76b0..ff3619a590 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
+import scala.language.postfixOps
/** Testsuite for testing the network receiver behavior */
class NetworkReceiverSuite extends FunSuite with Timeouts {