aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2015-01-02 15:09:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-02 15:09:41 -0800
commitbd88b7185358ae60efc83dc6cbb3fb1d2bff6074 (patch)
treeaa59d0302cd6dda73f527d9a76fb25030f745b4b
parent012839807c3dc6e7c8c41ac6e956d52a550bb031 (diff)
downloadspark-bd88b7185358ae60efc83dc6cbb3fb1d2bff6074.tar.gz
spark-bd88b7185358ae60efc83dc6cbb3fb1d2bff6074.tar.bz2
spark-bd88b7185358ae60efc83dc6cbb3fb1d2bff6074.zip
[SPARK-3325][Streaming] Add a parameter to the method print in class DStream
This PR is a fixed version of the original PR #3237 by watermen and scwf. This adds the ability to specify how many elements to print in `DStream.print`. Author: Yadong Qi <qiyadong2010@gmail.com> Author: q00251598 <qiyadong@huawei.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: wangfei <wangfei1@huawei.com> Closes #3865 from tdas/print-num and squashes the following commits: cd34e9e [Tathagata Das] Fix bug 7c09f16 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into HEAD bb35d1a [Yadong Qi] Update MimaExcludes.scala f8098ca [Yadong Qi] Update MimaExcludes.scala f6ac3cb [Yadong Qi] Update MimaExcludes.scala e4ed897 [Yadong Qi] Update MimaExcludes.scala 3b9d5cf [wangfei] fix conflicts ec8a3af [q00251598] move to Spark 1.3 26a70c0 [q00251598] extend the Python DStream's print b589a4b [q00251598] add another print function
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--python/pyspark/streaming/dstream.py12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala14
4 files changed, 30 insertions, 9 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c377e5cffa..31d4c317ae 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Matrices.rand")
) ++ Seq(
+ // SPARK-3325
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.api.java.JavaDStreamLike.print"),
// SPARK-2757
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 0826ddc56e..2fe39392ff 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -157,18 +157,20 @@ class DStream(object):
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)
- def pprint(self):
+ def pprint(self, num=10):
"""
- Print the first ten elements of each RDD generated in this DStream.
+ Print the first num elements of each RDD generated in this DStream.
+
+ @param num: the number of elements from the first will be printed.
"""
def takeAndPrint(time, rdd):
- taken = rdd.take(11)
+ taken = rdd.take(num + 1)
print "-------------------------------------------"
print "Time: %s" % time
print "-------------------------------------------"
- for record in taken[:10]:
+ for record in taken[:num]:
print record
- if len(taken) > 10:
+ if len(taken) > num:
print "..."
print
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 2a7004e56e..e0542eda13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -51,7 +51,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(): Unit = {
- dstream.print()
+ print(10)
+ }
+
+ /**
+ * Print the first num elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print(num: Int): Unit = {
+ dstream.print(num)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7f8651e719..28fc00cf39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -605,13 +605,21 @@ abstract class DStream[T: ClassTag] (
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() {
+ print(10)
+ }
+
+ /**
+ * Print the first num elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print(num: Int) {
def foreachFunc = (rdd: RDD[T], time: Time) => {
- val first11 = rdd.take(11)
+ val firstNum = rdd.take(num + 1)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
- first11.take(10).foreach(println)
- if (first11.size > 10) println("...")
+ firstNum.take(num).foreach(println)
+ if (firstNum.size > num) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()