aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/dstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r--python/pyspark/streaming/dstream.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 0826ddc56e..2fe39392ff 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -157,18 +157,20 @@ class DStream(object):
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)
- def pprint(self):
+ def pprint(self, num=10):
"""
- Print the first ten elements of each RDD generated in this DStream.
+ Print the first num elements of each RDD generated in this DStream.
+
+ @param num: the number of elements from the first will be printed.
"""
def takeAndPrint(time, rdd):
- taken = rdd.take(11)
+ taken = rdd.take(num + 1)
print "-------------------------------------------"
print "Time: %s" % time
print "-------------------------------------------"
- for record in taken[:10]:
+ for record in taken[:num]:
print record
- if len(taken) > 10:
+ if len(taken) > num:
print "..."
print