aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-18 16:08:06 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-18 16:08:06 -0800
commita402c92c92b2e1c85d264f6077aec8f6d6a08270 (patch)
tree31f502bd6fed297f05de7ed60f5bde7de9663dd5 /streaming/src/test
parent4b117121900e5f242e7c8f46a69164385f0da7cc (diff)
downloadspark-a402c92c92b2e1c85d264f6077aec8f6d6a08270.tar.gz
spark-a402c92c92b2e1c85d264f6077aec8f6d6a08270.tar.bz2
spark-a402c92c92b2e1c85d264f6077aec8f6d6a08270.zip
[SPARK-11814][STREAMING] Add better default checkpoint duration
DStream checkpoint interval is by default set at max(10 second, batch interval). That's bad for large batch intervals where the checkpoint interval = batch interval, and RDDs get checkpointed every batch. This PR is to set the checkpoint interval of trackStateByKey to 10 * batch duration. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9805 from tdas/SPARK-11814.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala44
1 files changed, 43 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
index e3072b4442..58aef74c00 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
@@ -22,9 +22,10 @@ import java.io.File
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
+import org.scalatest.PrivateMethodTester._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.streaming.dstream.{TrackStateDStream, TrackStateDStreamImpl}
+import org.apache.spark.streaming.dstream.{InternalTrackStateDStream, TrackStateDStream, TrackStateDStreamImpl}
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
@@ -57,6 +58,12 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef
sc = new SparkContext(conf)
}
+ override def afterAll(): Unit = {
+ if (sc != null) {
+ sc.stop()
+ }
+ }
+
test("state - get, exists, update, remove, ") {
var state: StateImpl[Int] = null
@@ -436,6 +443,41 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef
assert(collectedStateSnapshots.last.toSet === Set(("a", 1)))
}
+ test("trackStateByKey - checkpoint durations") {
+ val privateMethod = PrivateMethod[InternalTrackStateDStream[_, _, _, _]]('internalStream)
+
+ def testCheckpointDuration(
+ batchDuration: Duration,
+ expectedCheckpointDuration: Duration,
+ explicitCheckpointDuration: Option[Duration] = None
+ ): Unit = {
+ try {
+ ssc = new StreamingContext(sc, batchDuration)
+ val inputStream = new TestInputStream(ssc, Seq.empty[Seq[Int]], 2).map(_ -> 1)
+ val dummyFunc = (value: Option[Int], state: State[Int]) => 0
+ val trackStateStream = inputStream.trackStateByKey(StateSpec.function(dummyFunc))
+ val internalTrackStateStream = trackStateStream invokePrivate privateMethod()
+
+ explicitCheckpointDuration.foreach { d =>
+ trackStateStream.checkpoint(d)
+ }
+ trackStateStream.register()
+ ssc.start() // should initialize all the checkpoint durations
+ assert(trackStateStream.checkpointDuration === null)
+ assert(internalTrackStateStream.checkpointDuration === expectedCheckpointDuration)
+ } finally {
+ StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
+ }
+ }
+
+ testCheckpointDuration(Milliseconds(100), Seconds(1))
+ testCheckpointDuration(Seconds(1), Seconds(10))
+ testCheckpointDuration(Seconds(10), Seconds(100))
+
+ testCheckpointDuration(Milliseconds(100), Seconds(2), Some(Seconds(2)))
+ testCheckpointDuration(Seconds(1), Seconds(2), Some(Seconds(2)))
+ testCheckpointDuration(Seconds(10), Seconds(20), Some(Seconds(20)))
+ }
private def testOperation[K: ClassTag, S: ClassTag, T: ClassTag](
input: Seq[Seq[K]],