From bd88b7185358ae60efc83dc6cbb3fb1d2bff6074 Mon Sep 17 00:00:00 2001 From: Yadong Qi Date: Fri, 2 Jan 2015 15:09:41 -0800 Subject: [SPARK-3325][Streaming] Add a parameter to the method print in class DStream This PR is a fixed version of the original PR #3237 by watermen and scwf. This adds the ability to specify how many elements to print in `DStream.print`. Author: Yadong Qi Author: q00251598 Author: Tathagata Das Author: wangfei Closes #3865 from tdas/print-num and squashes the following commits: cd34e9e [Tathagata Das] Fix bug 7c09f16 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into HEAD bb35d1a [Yadong Qi] Update MimaExcludes.scala f8098ca [Yadong Qi] Update MimaExcludes.scala f6ac3cb [Yadong Qi] Update MimaExcludes.scala e4ed897 [Yadong Qi] Update MimaExcludes.scala 3b9d5cf [wangfei] fix conflicts ec8a3af [q00251598] move to Spark 1.3 26a70c0 [q00251598] extend the Python DStream's print b589a4b [q00251598] add another print function --- .../apache/spark/streaming/api/java/JavaDStreamLike.scala | 10 +++++++++- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 14 +++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 2a7004e56e..e0542eda13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -51,7 +51,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * operator, so this DStream will be registered as an output stream and there materialized. */ def print(): Unit = { - dstream.print() + print(10) + } + + /** + * Print the first num elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print(num: Int): Unit = { + dstream.print(num) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 7f8651e719..28fc00cf39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -605,13 +605,21 @@ abstract class DStream[T: ClassTag] ( * operator, so this DStream will be registered as an output stream and there materialized. */ def print() { + print(10) + } + + /** + * Print the first num elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print(num: Int) { def foreachFunc = (rdd: RDD[T], time: Time) => { - val first11 = rdd.take(11) + val firstNum = rdd.take(num + 1) println ("-------------------------------------------") println ("Time: " + time) println ("-------------------------------------------") - first11.take(10).foreach(println) - if (first11.size > 10) println("...") + firstNum.take(num).foreach(println) + if (firstNum.size > num) println("...") println() } new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() -- cgit v1.2.3