aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rwxr-xr-xexamples/src/main/python/als.py15
-rw-r--r--examples/src/main/python/avro_inputformat.py9
-rw-r--r--examples/src/main/python/cassandra_inputformat.py8
-rw-r--r--examples/src/main/python/cassandra_outputformat.py6
-rw-r--r--examples/src/main/python/hbase_inputformat.py8
-rw-r--r--examples/src/main/python/hbase_outputformat.py6
-rwxr-xr-xexamples/src/main/python/kmeans.py11
-rwxr-xr-xexamples/src/main/python/logistic_regression.py20
-rw-r--r--examples/src/main/python/ml/simple_text_classification_pipeline.py20
-rwxr-xr-xexamples/src/main/python/mllib/correlations.py19
-rw-r--r--examples/src/main/python/mllib/dataset_example.py13
-rwxr-xr-xexamples/src/main/python/mllib/decision_tree_runner.py29
-rw-r--r--examples/src/main/python/mllib/gaussian_mixture_model.py9
-rw-r--r--examples/src/main/python/mllib/gradient_boosted_trees.py7
-rwxr-xr-xexamples/src/main/python/mllib/kmeans.py5
-rwxr-xr-xexamples/src/main/python/mllib/logistic_regression.py9
-rwxr-xr-xexamples/src/main/python/mllib/random_forest_example.py9
-rwxr-xr-xexamples/src/main/python/mllib/random_rdd_generation.py21
-rwxr-xr-xexamples/src/main/python/mllib/sampled_rdds.py29
-rw-r--r--examples/src/main/python/mllib/word2vec.py5
-rwxr-xr-xexamples/src/main/python/pagerank.py16
-rw-r--r--examples/src/main/python/parquet_inputformat.py7
-rwxr-xr-xexamples/src/main/python/pi.py5
-rwxr-xr-xexamples/src/main/python/sort.py6
-rw-r--r--examples/src/main/python/sql.py4
-rw-r--r--examples/src/main/python/status_api_demo.py10
-rw-r--r--examples/src/main/python/streaming/hdfs_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/kafka_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/network_wordcount.py3
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py11
-rw-r--r--examples/src/main/python/streaming/sql_network_wordcount.py5
-rw-r--r--examples/src/main/python/streaming/stateful_network_wordcount.py3
-rwxr-xr-xexamples/src/main/python/transitive_closure.py10
-rwxr-xr-xexamples/src/main/python/wordcount.py6
34 files changed, 195 insertions, 155 deletions
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index 70b6146e39..1c3a787bd0 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -21,7 +21,8 @@ ALS in pyspark.mllib.recommendation for more conventional use.
This example requires numpy (http://www.numpy.org/)
"""
-from os.path import realpath
+from __future__ import print_function
+
import sys
import numpy as np
@@ -57,9 +58,9 @@ if __name__ == "__main__":
Usage: als [M] [U] [F] [iterations] [partitions]"
"""
- print >> sys.stderr, """WARN: This is a naive implementation of ALS and is given as an
+ print("""WARN: This is a naive implementation of ALS and is given as an
example. Please use the ALS method found in pyspark.mllib.recommendation for more
- conventional use."""
+ conventional use.""", file=sys.stderr)
sc = SparkContext(appName="PythonALS")
M = int(sys.argv[1]) if len(sys.argv) > 1 else 100
@@ -68,8 +69,8 @@ if __name__ == "__main__":
ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5
partitions = int(sys.argv[5]) if len(sys.argv) > 5 else 2
- print "Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" % \
- (M, U, F, ITERATIONS, partitions)
+ print("Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" %
+ (M, U, F, ITERATIONS, partitions))
R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M, F))
@@ -95,7 +96,7 @@ if __name__ == "__main__":
usb = sc.broadcast(us)
error = rmse(R, ms, us)
- print "Iteration %d:" % i
- print "\nRMSE: %5.4f\n" % error
+ print("Iteration %d:" % i)
+ print("\nRMSE: %5.4f\n" % error)
sc.stop()
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
index 4626bbb7e3..da368ac628 100644
--- a/examples/src/main/python/avro_inputformat.py
+++ b/examples/src/main/python/avro_inputformat.py
@@ -15,9 +15,12 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
+from functools import reduce
"""
Read data file users.avro in local Spark distro:
@@ -49,7 +52,7 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \
"""
if __name__ == "__main__":
if len(sys.argv) != 2 and len(sys.argv) != 3:
- print >> sys.stderr, """
+ print("""
Usage: avro_inputformat <data_file> [reader_schema_file]
Run with example jar:
@@ -57,7 +60,7 @@ if __name__ == "__main__":
/path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified
in [reader_schema_file].
- """
+ """, file=sys.stderr)
exit(-1)
path = sys.argv[1]
@@ -77,6 +80,6 @@ if __name__ == "__main__":
conf=conf)
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
- print k
+ print(k)
sc.stop()
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index 05f34b74df..93ca0cfcc9 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -47,14 +49,14 @@ cqlsh:test> SELECT * FROM users;
"""
if __name__ == "__main__":
if len(sys.argv) != 4:
- print >> sys.stderr, """
+ print("""
Usage: cassandra_inputformat <host> <keyspace> <cf>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \
/path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
@@ -77,6 +79,6 @@ if __name__ == "__main__":
conf=conf)
output = cass_rdd.collect()
for (k, v) in output:
- print (k, v)
+ print((k, v))
sc.stop()
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index d144539e58..5d643eac92 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -46,7 +48,7 @@ cqlsh:test> SELECT * FROM users;
"""
if __name__ == "__main__":
if len(sys.argv) != 7:
- print >> sys.stderr, """
+ print("""
Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
Run with example jar:
@@ -60,7 +62,7 @@ if __name__ == "__main__":
... fname text,
... lname text
... );
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3b16010f1c..e17819d5fe 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -47,14 +49,14 @@ ROW COLUMN+CELL
"""
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, """
+ print("""
Usage: hbase_inputformat <host> <table>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \
/path/to/examples/hbase_inputformat.py <host> <table>
Assumes you have some data in HBase already, running on <host>, in <table>
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
@@ -74,6 +76,6 @@ if __name__ == "__main__":
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
- print (k, v)
+ print((k, v))
sc.stop()
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index abb425b1f8..9e5641789a 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -40,7 +42,7 @@ ROW COLUMN+CELL
"""
if __name__ == "__main__":
if len(sys.argv) != 7:
- print >> sys.stderr, """
+ print("""
Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
Run with example jar:
@@ -48,7 +50,7 @@ if __name__ == "__main__":
/path/to/examples/hbase_outputformat.py <args>
Assumes you have created <table> with column family <family> in HBase
running on <host> already
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
index 86ef6f32c8..1939150646 100755
--- a/examples/src/main/python/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -22,6 +22,7 @@ examples/src/main/python/mllib/kmeans.py.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import sys
@@ -47,12 +48,12 @@ def closestPoint(p, centers):
if __name__ == "__main__":
if len(sys.argv) != 4:
- print >> sys.stderr, "Usage: kmeans <file> <k> <convergeDist>"
+ print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of KMeans Clustering and is given
+ print("""WARN: This is a naive implementation of KMeans Clustering and is given
as an example! Please refer to examples/src/main/python/mllib/kmeans.py for an example on
- how to use MLlib's KMeans implementation."""
+ how to use MLlib's KMeans implementation.""", file=sys.stderr)
sc = SparkContext(appName="PythonKMeans")
lines = sc.textFile(sys.argv[1])
@@ -69,13 +70,13 @@ if __name__ == "__main__":
pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(
- lambda (x, (y, z)): (x, y / z)).collect()
+ lambda xy: (xy[0], xy[1][0] / xy[1][1])).collect()
tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y
- print "Final centers: " + str(kPoints)
+ print("Final centers: " + str(kPoints))
sc.stop()
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
index 3aa56b0528..b318b7d87b 100755
--- a/examples/src/main/python/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -22,10 +22,8 @@ to act on batches of input data using efficient matrix operations.
In practice, one may prefer to use the LogisticRegression algorithm in
MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py.
"""
+from __future__ import print_function
-from collections import namedtuple
-from math import exp
-from os.path import realpath
import sys
import numpy as np
@@ -42,19 +40,19 @@ D = 10 # Number of dimensions
def readPointBatch(iterator):
strs = list(iterator)
matrix = np.zeros((len(strs), D + 1))
- for i in xrange(len(strs)):
- matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+ for i, s in enumerate(strs):
+ matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
return [matrix]
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+ print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of Logistic Regression and is
+ print("""WARN: This is a naive implementation of Logistic Regression and is
given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
- to see how MLlib's implementation is used."""
+ to see how MLlib's implementation is used.""", file=sys.stderr)
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
@@ -62,7 +60,7 @@ if __name__ == "__main__":
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
- print "Initial w: " + str(w)
+ print("Initial w: " + str(w))
# Compute logistic regression gradient for a matrix of data points
def gradient(matrix, w):
@@ -76,9 +74,9 @@ if __name__ == "__main__":
return x
for i in range(iterations):
- print "On iteration %i" % (i + 1)
+ print("On iteration %i" % (i + 1))
w -= points.map(lambda m: gradient(m, w)).reduce(add)
- print "Final w: " + str(w)
+ print("Final w: " + str(w))
sc.stop()
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c73edb7fd6..fab21f003b 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
@@ -37,10 +39,10 @@ if __name__ == "__main__":
# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
- training = sc.parallelize([(0L, "a b c d e spark", 1.0),
- (1L, "b d", 0.0),
- (2L, "spark f g h", 1.0),
- (3L, "hadoop mapreduce", 0.0)]) \
+ training = sc.parallelize([(0, "a b c d e spark", 1.0),
+ (1, "b d", 0.0),
+ (2, "spark f g h", 1.0),
+ (3, "hadoop mapreduce", 0.0)]) \
.map(lambda x: LabeledDocument(*x)).toDF()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
@@ -54,16 +56,16 @@ if __name__ == "__main__":
# Prepare test documents, which are unlabeled.
Document = Row("id", "text")
- test = sc.parallelize([(4L, "spark i j k"),
- (5L, "l m n"),
- (6L, "mapreduce spark"),
- (7L, "apache hadoop")]) \
+ test = sc.parallelize([(4, "spark i j k"),
+ (5, "l m n"),
+ (6, "mapreduce spark"),
+ (7, "apache hadoop")]) \
.map(lambda x: Document(*x)).toDF()
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
- print row
+ print(row)
sc.stop()
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
index 4218eca822..0e13546b88 100755
--- a/examples/src/main/python/mllib/correlations.py
+++ b/examples/src/main/python/mllib/correlations.py
@@ -18,6 +18,7 @@
"""
Correlations using MLlib.
"""
+from __future__ import print_function
import sys
@@ -29,7 +30,7 @@ from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: correlations (<file>)"
+ print("Usage: correlations (<file>)", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonCorrelations")
if len(sys.argv) == 2:
@@ -41,20 +42,20 @@ if __name__ == "__main__":
points = MLUtils.loadLibSVMFile(sc, filepath)\
.map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
- print
- print 'Summary of data file: ' + filepath
- print '%d data points' % points.count()
+ print()
+ print('Summary of data file: ' + filepath)
+ print('%d data points' % points.count())
# Statistics (correlations)
- print
- print 'Correlation (%s) between label and each feature' % corrType
- print 'Feature\tCorrelation'
+ print()
+ print('Correlation (%s) between label and each feature' % corrType)
+ print('Feature\tCorrelation')
numFeatures = points.take(1)[0].features.size
labelRDD = points.map(lambda lp: lp.label)
for i in range(numFeatures):
featureRDD = points.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, corrType)
- print '%d\t%g' % (i, corr)
- print
+ print('%d\t%g' % (i, corr))
+ print()
sc.stop()
diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/mllib/dataset_example.py
index fcbf56cbf0..e23ecc0c5d 100644
--- a/examples/src/main/python/mllib/dataset_example.py
+++ b/examples/src/main/python/mllib/dataset_example.py
@@ -19,6 +19,7 @@
An example of how to use DataFrame as a dataset for ML. Run with::
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
"""
+from __future__ import print_function
import os
import sys
@@ -32,16 +33,16 @@ from pyspark.mllib.stat import Statistics
def summarize(dataset):
- print "schema: %s" % dataset.schema().json()
+ print("schema: %s" % dataset.schema().json())
labels = dataset.map(lambda r: r.label)
- print "label average: %f" % labels.mean()
+ print("label average: %f" % labels.mean())
features = dataset.map(lambda r: r.features)
summary = Statistics.colStats(features)
- print "features average: %r" % summary.mean()
+ print("features average: %r" % summary.mean())
if __name__ == "__main__":
if len(sys.argv) > 2:
- print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
+ print("Usage: dataset_example.py <libsvm file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="DatasetExample")
sqlContext = SQLContext(sc)
@@ -54,9 +55,9 @@ if __name__ == "__main__":
summarize(dataset0)
tempdir = tempfile.NamedTemporaryFile(delete=False).name
os.unlink(tempdir)
- print "Save dataset as a Parquet file to %s." % tempdir
+ print("Save dataset as a Parquet file to %s." % tempdir)
dataset0.saveAsParquetFile(tempdir)
- print "Load it back and summarize it again."
+ print("Load it back and summarize it again.")
dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
summarize(dataset1)
shutil.rmtree(tempdir)
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index fccabd841b..513ed8fd51 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -20,6 +20,7 @@ Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import numpy
import os
@@ -83,18 +84,17 @@ def reindexClassLabels(data):
numClasses = len(classCounts)
# origToNewLabels: class --> index in 0,...,numClasses-1
if (numClasses < 2):
- print >> sys.stderr, \
- "Dataset for classification should have at least 2 classes." + \
- " The given dataset had only %d classes." % numClasses
+ print("Dataset for classification should have at least 2 classes."
+ " The given dataset had only %d classes." % numClasses, file=sys.stderr)
exit(1)
origToNewLabels = dict([(sortedClasses[i], i) for i in range(0, numClasses)])
- print "numClasses = %d" % numClasses
- print "Per-class example fractions, counts:"
- print "Class\tFrac\tCount"
+ print("numClasses = %d" % numClasses)
+ print("Per-class example fractions, counts:")
+ print("Class\tFrac\tCount")
for c in sortedClasses:
frac = classCounts[c] / (numExamples + 0.0)
- print "%g\t%g\t%d" % (c, frac, classCounts[c])
+ print("%g\t%g\t%d" % (c, frac, classCounts[c]))
if (sortedClasses[0] == 0 and sortedClasses[-1] == numClasses - 1):
return (data, origToNewLabels)
@@ -105,8 +105,7 @@ def reindexClassLabels(data):
def usage():
- print >> sys.stderr, \
- "Usage: decision_tree_runner [libsvm format data filepath]"
+ print("Usage: decision_tree_runner [libsvm format data filepath]", file=sys.stderr)
exit(1)
@@ -133,13 +132,13 @@ if __name__ == "__main__":
model = DecisionTree.trainClassifier(reindexedData, numClasses=numClasses,
categoricalFeaturesInfo=categoricalFeaturesInfo)
# Print learned tree and stats.
- print "Trained DecisionTree for classification:"
- print " Model numNodes: %d" % model.numNodes()
- print " Model depth: %d" % model.depth()
- print " Training accuracy: %g" % getAccuracy(model, reindexedData)
+ print("Trained DecisionTree for classification:")
+ print(" Model numNodes: %d" % model.numNodes())
+ print(" Model depth: %d" % model.depth())
+ print(" Training accuracy: %g" % getAccuracy(model, reindexedData))
if model.numNodes() < 20:
- print model.toDebugString()
+ print(model.toDebugString())
else:
- print model
+ print(model)
sc.stop()
diff --git a/examples/src/main/python/mllib/gaussian_mixture_model.py b/examples/src/main/python/mllib/gaussian_mixture_model.py
index a2cd626c9f..2cb8010cdc 100644
--- a/examples/src/main/python/mllib/gaussian_mixture_model.py
+++ b/examples/src/main/python/mllib/gaussian_mixture_model.py
@@ -18,7 +18,8 @@
"""
A Gaussian Mixture Model clustering program using MLlib.
"""
-import sys
+from __future__ import print_function
+
import random
import argparse
import numpy as np
@@ -59,7 +60,7 @@ if __name__ == "__main__":
model = GaussianMixture.train(data, args.k, args.convergenceTol,
args.maxIterations, args.seed)
for i in range(args.k):
- print ("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
- "sigma = ", model.gaussians[i].sigma.toArray())
- print ("Cluster labels (first 100): ", model.predict(data).take(100))
+ print(("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
+ "sigma = ", model.gaussians[i].sigma.toArray()))
+ print(("Cluster labels (first 100): ", model.predict(data).take(100)))
sc.stop()
diff --git a/examples/src/main/python/mllib/gradient_boosted_trees.py b/examples/src/main/python/mllib/gradient_boosted_trees.py
index e647773ad9..781bd61c9d 100644
--- a/examples/src/main/python/mllib/gradient_boosted_trees.py
+++ b/examples/src/main/python/mllib/gradient_boosted_trees.py
@@ -18,6 +18,7 @@
"""
Gradient boosted Trees classification and regression using MLlib.
"""
+from __future__ import print_function
import sys
@@ -34,7 +35,7 @@ def testClassification(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
+ testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count() \
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification ensemble model:')
@@ -49,7 +50,7 @@ def testRegression(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
+ testMSE = labelsAndPredictions.map(lambda vp: (vp[0] - vp[1]) * (vp[0] - vp[1])).sum() \
/ float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression ensemble model:')
@@ -58,7 +59,7 @@ def testRegression(trainingData, testData):
if __name__ == "__main__":
if len(sys.argv) > 1:
- print >> sys.stderr, "Usage: gradient_boosted_trees"
+ print("Usage: gradient_boosted_trees", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonGradientBoostedTrees")
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index 2eeb1abeeb..f901a87fa6 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -20,6 +20,7 @@ A K-means clustering program using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import sys
@@ -34,12 +35,12 @@ def parseVector(line):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: kmeans <file> <k>"
+ print("Usage: kmeans <file> <k>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="KMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector)
k = int(sys.argv[2])
model = KMeans.train(data, k)
- print "Final centers: " + str(model.clusterCenters)
+ print("Final centers: " + str(model.clusterCenters))
sc.stop()
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 8cae27fc4a..d4f1d34e2d 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -20,11 +20,10 @@ Logistic regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
-from math import exp
import sys
-import numpy as np
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
@@ -42,12 +41,12 @@ def parsePoint(line):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+ print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).map(parsePoint)
iterations = int(sys.argv[2])
model = LogisticRegressionWithSGD.train(points, iterations)
- print "Final weights: " + str(model.weights)
- print "Final intercept: " + str(model.intercept)
+ print("Final weights: " + str(model.weights))
+ print("Final intercept: " + str(model.intercept))
sc.stop()
diff --git a/examples/src/main/python/mllib/random_forest_example.py b/examples/src/main/python/mllib/random_forest_example.py
index d3c24f7664..4cfdad868c 100755
--- a/examples/src/main/python/mllib/random_forest_example.py
+++ b/examples/src/main/python/mllib/random_forest_example.py
@@ -22,6 +22,7 @@ Note: This example illustrates binary classification.
For information on multiclass classification, please refer to the decision_tree_runner.py
example.
"""
+from __future__ import print_function
import sys
@@ -43,7 +44,7 @@ def testClassification(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count()\
+ testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count()\
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
@@ -62,8 +63,8 @@ def testRegression(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum()\
- / float(testData.count())
+ testMSE = labelsAndPredictions.map(lambda v_p1: (v_p1[0] - v_p1[1]) * (v_p1[0] - v_p1[1]))\
+ .sum() / float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())
@@ -71,7 +72,7 @@ def testRegression(trainingData, testData):
if __name__ == "__main__":
if len(sys.argv) > 1:
- print >> sys.stderr, "Usage: random_forest_example"
+ print("Usage: random_forest_example", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonRandomForestExample")
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
index 1e8892741e..729bae30b1 100755
--- a/examples/src/main/python/mllib/random_rdd_generation.py
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -18,6 +18,7 @@
"""
Randomly generated RDDs.
"""
+from __future__ import print_function
import sys
@@ -27,7 +28,7 @@ from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: random_rdd_generation"
+ print("Usage: random_rdd_generation", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonRandomRDDGeneration")
@@ -37,19 +38,19 @@ if __name__ == "__main__":
# Example: RandomRDDs.normalRDD
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
- print 'Generated RDD of %d examples sampled from the standard normal distribution'\
- % normalRDD.count()
- print ' First 5 samples:'
+ print('Generated RDD of %d examples sampled from the standard normal distribution'
+ % normalRDD.count())
+ print(' First 5 samples:')
for sample in normalRDD.take(5):
- print ' ' + str(sample)
- print
+ print(' ' + str(sample))
+ print()
# Example: RandomRDDs.normalVectorRDD
normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2)
- print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
- print ' First 5 samples:'
+ print('Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count())
+ print(' First 5 samples:')
for sample in normalVectorRDD.take(5):
- print ' ' + str(sample)
- print
+ print(' ' + str(sample))
+ print()
sc.stop()
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
index 92af3af5eb..b7033ab7da 100755
--- a/examples/src/main/python/mllib/sampled_rdds.py
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -18,6 +18,7 @@
"""
Randomly sampled RDDs.
"""
+from __future__ import print_function
import sys
@@ -27,7 +28,7 @@ from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
+ print("Usage: sampled_rdds <libsvm data file>", file=sys.stderr)
exit(-1)
if len(sys.argv) == 2:
datapath = sys.argv[1]
@@ -41,24 +42,24 @@ if __name__ == "__main__":
examples = MLUtils.loadLibSVMFile(sc, datapath)
numExamples = examples.count()
if numExamples == 0:
- print >> sys.stderr, "Error: Data file had no samples to load."
+ print("Error: Data file had no samples to load.", file=sys.stderr)
exit(1)
- print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
+ print('Loaded data with %d examples from file: %s' % (numExamples, datapath))
# Example: RDD.sample() and RDD.takeSample()
expectedSampleSize = int(numExamples * fraction)
- print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
- % (fraction, expectedSampleSize)
+ print('Sampling RDD using fraction %g. Expected sample size = %d.'
+ % (fraction, expectedSampleSize))
sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
- print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
+ print(' RDD.sample(): sample has %d examples' % sampledRDD.count())
sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
- print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
+ print(' RDD.takeSample(): sample has %d examples' % len(sampledArray))
- print
+ print()
# Example: RDD.sampleByKey()
keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
- print ' Keyed data using label (Int) as key ==> Orig'
+ print(' Keyed data using label (Int) as key ==> Orig')
# Count examples per label in original data.
keyCountsA = keyedRDD.countByKey()
@@ -69,18 +70,18 @@ if __name__ == "__main__":
sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
keyCountsB = sampledByKeyRDD.countByKey()
sizeB = sum(keyCountsB.values())
- print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
- % sizeB
+ print(' Sampled %d examples using approximate stratified sampling (by label). ==> Sample'
+ % sizeB)
# Compare samples
- print ' \tFractions of examples with key'
- print 'Key\tOrig\tSample'
+ print(' \tFractions of examples with key')
+ print('Key\tOrig\tSample')
for k in sorted(keyCountsA.keys()):
fracA = keyCountsA[k] / float(numExamples)
if sizeB != 0:
fracB = keyCountsB.get(k, 0) / float(sizeB)
else:
fracB = 0
- print '%d\t%g\t%g' % (k, fracA, fracB)
+ print('%d\t%g\t%g' % (k, fracA, fracB))
sc.stop()
diff --git a/examples/src/main/python/mllib/word2vec.py b/examples/src/main/python/mllib/word2vec.py
index 99fef4276a..40d1b88792 100644
--- a/examples/src/main/python/mllib/word2vec.py
+++ b/examples/src/main/python/mllib/word2vec.py
@@ -23,6 +23,7 @@
# grep -o -E '\w+(\W+\w+){0,15}' text8 > text8_lines
# This was done so that the example can be run in local mode
+from __future__ import print_function
import sys
@@ -34,7 +35,7 @@ USAGE = ("bin/spark-submit --driver-memory 4g "
if __name__ == "__main__":
if len(sys.argv) < 2:
- print USAGE
+ print(USAGE)
sys.exit("Argument for file not provided")
file_path = sys.argv[1]
sc = SparkContext(appName='Word2Vec')
@@ -46,5 +47,5 @@ if __name__ == "__main__":
synonyms = model.findSynonyms('china', 40)
for word, cosine_distance in synonyms:
- print "{}: {}".format(word, cosine_distance)
+ print("{}: {}".format(word, cosine_distance))
sc.stop()
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index a5f25d78c1..2fdc9773d4 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -19,6 +19,7 @@
This is an example implementation of PageRank. For more conventional use,
Please refer to PageRank implementation provided by graphx
"""
+from __future__ import print_function
import re
import sys
@@ -42,11 +43,12 @@ def parseNeighbors(urls):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: pagerank <file> <iterations>"
+ print("Usage: pagerank <file> <iterations>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of PageRank and is
- given as an example! Please refer to PageRank implementation provided by graphx"""
+ print("""WARN: This is a naive implementation of PageRank and is
+ given as an example! Please refer to PageRank implementation provided by graphx""",
+ file=sys.stderr)
# Initialize the spark context.
sc = SparkContext(appName="PythonPageRank")
@@ -62,19 +64,19 @@ if __name__ == "__main__":
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- ranks = links.map(lambda (url, neighbors): (url, 1.0))
+ ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
- for iteration in xrange(int(sys.argv[2])):
+ for iteration in range(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
- lambda (url, (urls, rank)): computeContribs(urls, rank))
+ lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
- print "%s has rank: %s." % (link, rank)
+ print("%s has rank: %s." % (link, rank))
sc.stop()
diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py
index fa4c20ab20..96ddac761d 100644
--- a/examples/src/main/python/parquet_inputformat.py
+++ b/examples/src/main/python/parquet_inputformat.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -35,14 +36,14 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \\
"""
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, """
+ print("""
Usage: parquet_inputformat.py <data_file>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \\
/path/to/examples/parquet_inputformat.py <data_file>
Assumes you have Parquet data stored in <data_file>.
- """
+ """, file=sys.stderr)
exit(-1)
path = sys.argv[1]
@@ -56,6 +57,6 @@ if __name__ == "__main__":
valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
output = parquet_rdd.map(lambda x: x[1]).collect()
for k in output:
- print k
+ print(k)
sc.stop()
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index a7c74e969c..92e5cf45ab 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -35,7 +36,7 @@ if __name__ == "__main__":
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
- count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
- print "Pi is roughly %f" % (4.0 * count / n)
+ count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+ print("Pi is roughly %f" % (4.0 * count / n))
sc.stop()
diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py
index bb686f1751..f6b0ecb02c 100755
--- a/examples/src/main/python/sort.py
+++ b/examples/src/main/python/sort.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -22,7 +24,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, "Usage: sort <file>"
+ print("Usage: sort <file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonSort")
lines = sc.textFile(sys.argv[1], 1)
@@ -33,6 +35,6 @@ if __name__ == "__main__":
# In reality, we wouldn't want to collect all the data to the driver node.
output = sortedCount.collect()
for (num, unitcount) in output:
- print num
+ print(num)
sc.stop()
diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py
index d89361f324..87d7b088f0 100644
--- a/examples/src/main/python/sql.py
+++ b/examples/src/main/python/sql.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import os
from pyspark import SparkContext
@@ -68,6 +70,6 @@ if __name__ == "__main__":
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
for each in teenagers.collect():
- print each[0]
+ print(each[0])
sc.stop()
diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py
index a33bdc475a..49b7902185 100644
--- a/examples/src/main/python/status_api_demo.py
+++ b/examples/src/main/python/status_api_demo.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import time
import threading
import Queue
@@ -52,15 +54,15 @@ def main():
ids = status.getJobIdsForGroup()
for id in ids:
job = status.getJobInfo(id)
- print "Job", id, "status: ", job.status
+ print("Job", id, "status: ", job.status)
for sid in job.stageIds:
info = status.getStageInfo(sid)
if info:
- print "Stage %d: %d tasks total (%d active, %d complete)" % \
- (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)
+ print("Stage %d: %d tasks total (%d active, %d complete)" %
+ (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
time.sleep(1)
- print "Job results are:", result.get()
+ print("Job results are:", result.get())
sc.stop()
if __name__ == "__main__":
diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py
index f7ffb53796..f815dd2682 100644
--- a/examples/src/main/python/streaming/hdfs_wordcount.py
+++ b/examples/src/main/python/streaming/hdfs_wordcount.py
@@ -25,6 +25,7 @@
Then create a text file in `localdir` and the words in the file will get counted.
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
+ print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py
index 51e1ff822f..b178e7899b 100644
--- a/examples/src/main/python/streaming/kafka_wordcount.py
+++ b/examples/src/main/python/streaming/kafka_wordcount.py
@@ -27,6 +27,7 @@
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""
+from __future__ import print_function
import sys
@@ -36,7 +37,7 @@ from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
+ print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py
index cfa9c1ff5b..2b48bcfd55 100644
--- a/examples/src/main/python/streaming/network_wordcount.py
+++ b/examples/src/main/python/streaming/network_wordcount.py
@@ -25,6 +25,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
+ print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index fc6827c82b..ac91f0a06b 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -35,6 +35,7 @@
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""
+from __future__ import print_function
import os
import sys
@@ -46,7 +47,7 @@ from pyspark.streaming import StreamingContext
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
- print "Creating new context"
+ print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
@@ -60,8 +61,8 @@ def createContext(host, port, outputPath):
def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
- print counts
- print "Appending to " + os.path.abspath(outputPath)
+ print(counts)
+ print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
@@ -70,8 +71,8 @@ def createContext(host, port, outputPath):
if __name__ == "__main__":
if len(sys.argv) != 5:
- print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
- "<checkpoint-directory> <output-file>"
+ print("Usage: recoverable_network_wordcount.py <hostname> <port> "
+ "<checkpoint-directory> <output-file>", file=sys.stderr)
exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
index f89bc562d8..da90c07dbd 100644
--- a/examples/src/main/python/streaming/sql_network_wordcount.py
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -27,6 +27,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
"""
+from __future__ import print_function
import os
import sys
@@ -44,7 +45,7 @@ def getSqlContextInstance(sparkContext):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
+ print("Usage: sql_network_wordcount.py <hostname> <port> ", file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
@@ -57,7 +58,7 @@ if __name__ == "__main__":
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
- print "========= %s =========" % str(time)
+ print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 18a9a5a452..16ef646b7c 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -29,6 +29,7 @@
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
localhost 9999`
"""
+from __future__ import print_function
import sys
@@ -37,7 +38,7 @@ from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
+ print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py
index 00a281bfb6..7bf5fb6ddf 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from random import Random
@@ -49,20 +51,20 @@ if __name__ == "__main__":
# the graph to obtain the path (x, z).
# Because join() joins on keys, the edges are stored in reversed order.
- edges = tc.map(lambda (x, y): (y, x))
+ edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
- oldCount = 0L
+ oldCount = 0
nextCount = tc.count()
while True:
oldCount = nextCount
# Perform the join, obtaining an RDD of (y, (z, x)) pairs,
# then project the result to obtain the new (x, z) paths.
- new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
+ new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
tc = tc.union(new_edges).distinct().cache()
nextCount = tc.count()
if nextCount == oldCount:
break
- print "TC has %i edges" % tc.count()
+ print("TC has %i edges" % tc.count())
sc.stop()
diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py
index ae6cd13b83..7c0143607b 100755
--- a/examples/src/main/python/wordcount.py
+++ b/examples/src/main/python/wordcount.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from operator import add
@@ -23,7 +25,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, "Usage: wordcount <file>"
+ print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile(sys.argv[1], 1)
@@ -32,6 +34,6 @@ if __name__ == "__main__":
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
- print "%s: %i" % (word, count)
+ print("%s: %i" % (word, count))
sc.stop()