diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 02:12:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 02:12:41 -0800 |
commit | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (patch) | |
tree | 84f990d67335abc27c5ba126c75f31d4452a0295 /streaming/src | |
parent | f98c7da23ef66812b8b4888230ee98c07f09af23 (diff) | |
download | spark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.tar.gz spark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.tar.bz2 spark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.zip |
Added checkpointing and fault-tolerance semantics to the programming guide. Fixed default checkpoint interval to being a multiple of slide duration. Fixed visibility of some classes and objects to clean up docs.
Diffstat (limited to 'streaming/src')
6 files changed, 11 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 84e4b5bedb..e1be5ef51c 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] ( // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint && checkpointDuration == null) { - checkpointDuration = slideDuration.max(Seconds(10)) + checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo("Checkpoint interval automatically set to " + checkpointDuration) } diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala index e4dc579a17..ee26206e24 100644 --- a/streaming/src/main/scala/spark/streaming/Duration.scala +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -16,7 +16,7 @@ case class Duration (private val millis: Long) { def * (times: Int): Duration = new Duration(millis * times) - def / (that: Duration): Long = millis / that.millis + def / (that: Duration): Double = millis.toDouble / that.millis.toDouble def isMultipleOf(that: Duration): Boolean = (this.millis % that.millis == 0) diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index dc21dfb722..6a8b81760e 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) { override def toString = "[" + beginTime + ", " + endTime + "]" } +private[streaming] object Interval { def currentInterval(duration: Duration): Interval = { val time = new Time(System.currentTimeMillis) diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 5127db3bbc..5a2dd46fa0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -18,8 +18,8 @@ import org.apache.hadoop.conf.Configuration class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) extends Serializable { - - def ssc = self.ssc + + private[streaming] def ssc = self.ssc private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) @@ -242,7 +242,9 @@ extends Serializable { * Return a new DStream by applying incremental `reduceByKey` over a sliding window. * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) + * * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. @@ -399,7 +401,7 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that * key in both RDDs. Partitioner is used to partition each generated RDD. */ diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index e70822e5c3..0e21b7480c 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -13,6 +13,7 @@ import twitter4j.auth.BasicAuthorization * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. */ +private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, username: String, @@ -26,6 +27,7 @@ class TwitterInputDStream( } } +private[streaming] class TwitterReceiver( username: String, password: String, diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 5250667bcb..cac86deeaf 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -50,7 +50,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once - val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2 + val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches // Setup the streams |