aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala14
2 files changed, 20 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 2a7004e56e..e0542eda13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -51,7 +51,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(): Unit = {
- dstream.print()
+ print(10)
+ }
+
+ /**
+ * Print the first num elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print(num: Int): Unit = {
+ dstream.print(num)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7f8651e719..28fc00cf39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -605,13 +605,21 @@ abstract class DStream[T: ClassTag] (
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() {
+ print(10)
+ }
+
+ /**
+ * Print the first num elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print(num: Int) {
def foreachFunc = (rdd: RDD[T], time: Time) => {
- val first11 = rdd.take(11)
+ val firstNum = rdd.take(num + 1)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
- first11.take(10).foreach(println)
- if (first11.size > 10) println("...")
+ firstNum.take(num).foreach(println)
+ if (firstNum.size > num) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()