aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/streaming
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /examples/src/main/python/streaming
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'examples/src/main/python/streaming')
-rw-r--r--examples/src/main/python/streaming/hdfs_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/kafka_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/network_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py11
-rw-r--r--examples/src/main/python/streaming/sql_network_wordcount.py5
-rw-r--r--examples/src/main/python/streaming/stateful_network_wordcount.py3
6 files changed, 17 insertions, 11 deletions
diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py
index f7ffb53796..f815dd2682 100644
--- a/examples/src/main/python/streaming/hdfs_wordcount.py
+++ b/examples/src/main/python/streaming/hdfs_wordcount.py
@@ -25,6 +25,7 @@
Then create a text file in `localdir` and the words in the file will get counted.
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
+ print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py
index 51e1ff822f..b178e7899b 100644
--- a/examples/src/main/python/streaming/kafka_wordcount.py
+++ b/examples/src/main/python/streaming/kafka_wordcount.py
@@ -27,6 +27,7 @@
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""
+from __future__ import print_function
import sys
@@ -36,7 +37,7 @@ from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
+ print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py
index cfa9c1ff5b..2b48bcfd55 100644
--- a/examples/src/main/python/streaming/network_wordcount.py
+++ b/examples/src/main/python/streaming/network_wordcount.py
@@ -25,6 +25,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
+ print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index fc6827c82b..ac91f0a06b 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -35,6 +35,7 @@
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""
+from __future__ import print_function
import os
import sys
@@ -46,7 +47,7 @@ from pyspark.streaming import StreamingContext
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
- print "Creating new context"
+ print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
@@ -60,8 +61,8 @@ def createContext(host, port, outputPath):
def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
- print counts
- print "Appending to " + os.path.abspath(outputPath)
+ print(counts)
+ print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
@@ -70,8 +71,8 @@ def createContext(host, port, outputPath):
if __name__ == "__main__":
if len(sys.argv) != 5:
- print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
- "<checkpoint-directory> <output-file>"
+ print("Usage: recoverable_network_wordcount.py <hostname> <port> "
+ "<checkpoint-directory> <output-file>", file=sys.stderr)
exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
index f89bc562d8..da90c07dbd 100644
--- a/examples/src/main/python/streaming/sql_network_wordcount.py
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -27,6 +27,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import os
import sys
@@ -44,7 +45,7 @@ def getSqlContextInstance(sparkContext):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
+ print("Usage: sql_network_wordcount.py <hostname> <port> ", file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
@@ -57,7 +58,7 @@ if __name__ == "__main__":
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
- print "========= %s =========" % str(time)
+ print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 18a9a5a452..16ef646b7c 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -29,6 +29,7 @@
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -37,7 +38,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
+ print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)