aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 21:59:51 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 21:59:51 -0800
commit777c181d2f583570956724f9cbe20eb1dc7048f1 (patch)
tree4e042deb8fae3663d8b1692c69c666d7ab9a5f1e
parent034f89aaab1db95e8908432f2445d6841526efcf (diff)
parent405bfe86ef9c3021358d2ac89192857478861fe0 (diff)
downloadspark-777c181d2f583570956724f9cbe20eb1dc7048f1.tar.gz
spark-777c181d2f583570956724f9cbe20eb1dc7048f1.tar.bz2
spark-777c181d2f583570956724f9cbe20eb1dc7048f1.zip
Merge remote-tracking branch 'apache/master' into dstream-move
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
-rw-r--r--docs/streaming-programming-guide.md4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala29
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
12 files changed, 61 insertions, 24 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index cec1b75baf..4e8a680a75 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -175,7 +175,7 @@ When an output operator is called, it triggers the computation of a stream. Curr
<table class="table">
<tr><th style="width:30%">Operator</th><th>Meaning</th></tr>
<tr>
- <td> <b>foreach</b>(<i>func</i>) </td>
+ <td> <b>foreachRDD</b>(<i>func</i>) </td>
<td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. </td>
</tr>
@@ -375,7 +375,7 @@ There are two failure behaviors based on which input sources are used.
1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data.
-Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations.
+Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations.
## Failure of the Driver Node
A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint(<checkpoint directory>)` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 3d08d86567..99b79c3949 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -58,7 +58,7 @@ object RawNetworkGrep {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams)
- union.filter(_.contains("the")).count().foreach(r =>
+ union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9418..8c5d0bd568 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -82,7 +82,7 @@ object RecoverableNetworkWordCount {
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 80b5a98b14..483c4d3118 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -81,7 +81,7 @@ object TwitterAlgebirdCMS {
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
- approxTopUsers.foreach(rdd => {
+ approxTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
@@ -96,7 +96,7 @@ object TwitterAlgebirdCMS {
}
})
- exactTopUsers.foreach(rdd => {
+ exactTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index cb2f2c51a0..94c2bf29ac 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -67,7 +67,7 @@ object TwitterAlgebirdHLL {
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
- approxUsers.foreach(rdd => {
+ approxUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalHll += partial
@@ -76,7 +76,7 @@ object TwitterAlgebirdHLL {
}
})
- exactUsers.foreach(rdd => {
+ exactUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 16c10feaba..8a70d4a978 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -56,13 +56,13 @@ object TwitterPopularTags {
// Print popular hashtags
- topCounts60.foreach(rdd => {
+ topCounts60.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
- topCounts10.foreach(rdd => {
+ topCounts10.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index da6b67bcce..bb44bc3d06 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -91,7 +91,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreach((rdd, time) => rdd.join(userList)
+ .foreachRDD((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index d3cd52ad7c..1ec4492bca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -244,17 +244,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 0.9.0, replaced by foreachRDD
*/
+ @Deprecated
def foreach(foreachFunc: JFunction[R, Void]) {
- dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
+ foreachRDD(foreachFunc)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 0.9.0, replaced by foreachRDD
*/
+ @Deprecated
def foreach(foreachFunc: JFunction2[R, Time, Void]) {
- dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ foreachRDD(foreachFunc)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JFunction[R, Void]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 8014db661f..a7c4cca7ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -17,10 +17,12 @@
package org.apache.spark.streaming.dstream
+
+import scala.deprecated
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
@@ -31,7 +33,6 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.Duration
-
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
@@ -488,15 +489,29 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: RDD[T] => Unit) {
- this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ @deprecated("use foreachRDD", "0.9.0")
+ def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ @deprecated("use foreachRDD", "0.9.0")
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
@@ -720,7 +735,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
/**
@@ -733,7 +748,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
def register() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f71dd17b2f..6b3e48382e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -583,7 +583,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
/**
@@ -613,7 +613,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index d293d20644..7037aae234 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -380,7 +380,7 @@ class BasicOperationsSuite extends TestSuiteBase {
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
- stream.foreach(_ => {}) // Dummy output stream
+ stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a4d0f9f978..f7f3346f81 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -187,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
- .foreach(_.count)
+ .foreachRDD(_.count)
val exception = intercept[Exception] {
ssc.start()