aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 02:12:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 02:12:41 -0800
commit8ad561dc7d6475d7b217ec3f57bac3b584fed31a (patch)
tree84f990d67335abc27c5ba126c75f31d4452a0295 /streaming
parentf98c7da23ef66812b8b4888230ee98c07f09af23 (diff)
downloadspark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.tar.gz
spark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.tar.bz2
spark-8ad561dc7d6475d7b217ec3f57bac3b584fed31a.zip
Added checkpointing and fault-tolerance semantics to the programming guide. Fixed default checkpoint interval to being a multiple of slide duration. Fixed visibility of some classes and objects to clean up docs.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
6 files changed, 11 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 84e4b5bedb..e1be5ef51c 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
- checkpointDuration = slideDuration.max(Seconds(10))
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index e4dc579a17..ee26206e24 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -16,7 +16,7 @@ case class Duration (private val millis: Long) {
def * (times: Int): Duration = new Duration(millis * times)
- def / (that: Duration): Long = millis / that.millis
+ def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index dc21dfb722..6a8b81760e 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) {
override def toString = "[" + beginTime + ", " + endTime + "]"
}
+private[streaming]
object Interval {
def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 5127db3bbc..5a2dd46fa0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -18,8 +18,8 @@ import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
-
- def ssc = self.ssc
+
+ private[streaming] def ssc = self.ssc
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
@@ -242,7 +242,9 @@ extends Serializable {
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
+ *
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ *
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
@@ -399,7 +401,7 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index e70822e5c3..0e21b7480c 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -13,6 +13,7 @@ import twitter4j.auth.BasicAuthorization
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*/
+private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
@@ -26,6 +27,7 @@ class TwitterInputDStream(
}
}
+private[streaming]
class TwitterReceiver(
username: String,
password: String,
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 5250667bcb..cac86deeaf 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -50,7 +50,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
// Setup the streams