aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/python/streaming')
-rw-r--r--examples/src/main/python/streaming/hdfs_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/kafka_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/network_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py11
-rw-r--r--examples/src/main/python/streaming/sql_network_wordcount.py5
-rw-r--r--examples/src/main/python/streaming/stateful_network_wordcount.py3
6 files changed, 17 insertions, 11 deletions
diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py
index f7ffb53796..f815dd2682 100644
--- a/examples/src/main/python/streaming/hdfs_wordcount.py
+++ b/examples/src/main/python/streaming/hdfs_wordcount.py
@@ -25,6 +25,7 @@
Then create a text file in `localdir` and the words in the file will get counted.
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
+ print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py
index 51e1ff822f..b178e7899b 100644
--- a/examples/src/main/python/streaming/kafka_wordcount.py
+++ b/examples/src/main/python/streaming/kafka_wordcount.py
@@ -27,6 +27,7 @@
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""
+from __future__ import print_function
import sys
@@ -36,7 +37,7 @@ from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
+ print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py
index cfa9c1ff5b..2b48bcfd55 100644
--- a/examples/src/main/python/streaming/network_wordcount.py
+++ b/examples/src/main/python/streaming/network_wordcount.py
@@ -25,6 +25,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
+ print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index fc6827c82b..ac91f0a06b 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -35,6 +35,7 @@
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""
+from __future__ import print_function
import os
import sys
@@ -46,7 +47,7 @@ from pyspark.streaming import StreamingContext
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
- print "Creating new context"
+ print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
@@ -60,8 +61,8 @@ def createContext(host, port, outputPath):
def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
- print counts
- print "Appending to " + os.path.abspath(outputPath)
+ print(counts)
+ print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
@@ -70,8 +71,8 @@ def createContext(host, port, outputPath):
if __name__ == "__main__":
if len(sys.argv) != 5:
- print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
- "<checkpoint-directory> <output-file>"
+ print("Usage: recoverable_network_wordcount.py <hostname> <port> "
+ "<checkpoint-directory> <output-file>", file=sys.stderr)
exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
index f89bc562d8..da90c07dbd 100644
--- a/examples/src/main/python/streaming/sql_network_wordcount.py
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -27,6 +27,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import os
import sys
@@ -44,7 +45,7 @@ def getSqlContextInstance(sparkContext):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
+ print("Usage: sql_network_wordcount.py <hostname> <port> ", file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
@@ -57,7 +58,7 @@ if __name__ == "__main__":
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
- print "========= %s =========" % str(time)
+ print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 18a9a5a452..16ef646b7c 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -29,6 +29,7 @@
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -37,7 +38,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
+ print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)