From 9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Fri, 5 Sep 2014 23:08:54 -0700 Subject: [SPARK-3361] Expand PEP 8 checks to include EC2 script and Python examples This PR resolves [SPARK-3361](https://issues.apache.org/jira/browse/SPARK-3361) by expanding the PEP 8 checks to cover the remaining Python code base: * The EC2 script * All Python / PySpark examples Author: Nicholas Chammas Closes #2297 from nchammas/pep8-rulez and squashes the following commits: 1e5ac9a [Nicholas Chammas] PEP 8 fixes to Python examples c3dbeff [Nicholas Chammas] PEP 8 fixes to EC2 script 65ef6e8 [Nicholas Chammas] expand PEP 8 checks --- examples/src/main/python/avro_inputformat.py | 17 ++++++++++------ examples/src/main/python/cassandra_inputformat.py | 15 +++++++------- examples/src/main/python/cassandra_outputformat.py | 23 +++++++++++----------- examples/src/main/python/hbase_inputformat.py | 10 +++++++--- examples/src/main/python/hbase_outputformat.py | 18 ++++++++++------- examples/src/main/python/mllib/correlations.py | 2 +- .../src/main/python/mllib/decision_tree_runner.py | 6 ++++-- .../src/main/python/mllib/random_rdd_generation.py | 6 +++--- examples/src/main/python/mllib/sampled_rdds.py | 8 ++++---- examples/src/main/python/pi.py | 2 +- 10 files changed, 62 insertions(+), 45 deletions(-) (limited to 'examples') diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py index e902ae2975..cfda8d8327 100644 --- a/examples/src/main/python/avro_inputformat.py +++ b/examples/src/main/python/avro_inputformat.py @@ -23,7 +23,8 @@ from pyspark import SparkContext Read data file users.avro in local Spark distro: $ cd $SPARK_HOME -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro {u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]} {u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []} @@ -40,7 +41,8 @@ $ cat examples/src/main/resources/user.avsc ] } -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro examples/src/main/resources/user.avsc {u'favorite_color': None, u'name': u'Alyssa'} {u'favorite_color': u'red', u'name': u'Ben'} @@ -51,8 +53,10 @@ if __name__ == "__main__": Usage: avro_inputformat [reader_schema_file] Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py [reader_schema_file] - Assumes you have Avro data stored in . Reader schema can be optionally specified in [reader_schema_file]. + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/avro_inputformat.py [reader_schema_file] + Assumes you have Avro data stored in . Reader schema can be optionally specified + in [reader_schema_file]. """ exit(-1) @@ -62,9 +66,10 @@ if __name__ == "__main__": conf = None if len(sys.argv) == 3: schema_rdd = sc.textFile(sys.argv[2], 1).collect() - conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)} + conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, schema_rdd)} - avro_rdd = sc.newAPIHadoopFile(path, + avro_rdd = sc.newAPIHadoopFile( + path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "org.apache.hadoop.io.NullWritable", diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py index e4a897f61e..05f34b74df 100644 --- a/examples/src/main/python/cassandra_inputformat.py +++ b/examples/src/main/python/cassandra_inputformat.py @@ -51,7 +51,8 @@ if __name__ == "__main__": Usage: cassandra_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_inputformat.py Assumes you have some data in Cassandra already, running on , in and """ exit(-1) @@ -61,12 +62,12 @@ if __name__ == "__main__": cf = sys.argv[3] sc = SparkContext(appName="CassandraInputFormat") - conf = {"cassandra.input.thrift.address":host, - "cassandra.input.thrift.port":"9160", - "cassandra.input.keyspace":keyspace, - "cassandra.input.columnfamily":cf, - "cassandra.input.partitioner.class":"Murmur3Partitioner", - "cassandra.input.page.row.size":"3"} + conf = {"cassandra.input.thrift.address": host, + "cassandra.input.thrift.port": "9160", + "cassandra.input.keyspace": keyspace, + "cassandra.input.columnfamily": cf, + "cassandra.input.partitioner.class": "Murmur3Partitioner", + "cassandra.input.page.row.size": "3"} cass_rdd = sc.newAPIHadoopRDD( "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", "java.util.Map", diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py index 836c35b5c6..d144539e58 100644 --- a/examples/src/main/python/cassandra_outputformat.py +++ b/examples/src/main/python/cassandra_outputformat.py @@ -50,7 +50,8 @@ if __name__ == "__main__": Usage: cassandra_outputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_outputformat.py Assumes you have created the following table in Cassandra already, running on , in . @@ -67,16 +68,16 @@ if __name__ == "__main__": cf = sys.argv[3] sc = SparkContext(appName="CassandraOutputFormat") - conf = {"cassandra.output.thrift.address":host, - "cassandra.output.thrift.port":"9160", - "cassandra.output.keyspace":keyspace, - "cassandra.output.partitioner.class":"Murmur3Partitioner", - "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", - "mapreduce.output.basename":cf, - "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat", - "mapreduce.job.output.key.class":"java.util.Map", - "mapreduce.job.output.value.class":"java.util.List"} - key = {"user_id" : int(sys.argv[4])} + conf = {"cassandra.output.thrift.address": host, + "cassandra.output.thrift.port": "9160", + "cassandra.output.keyspace": keyspace, + "cassandra.output.partitioner.class": "Murmur3Partitioner", + "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", + "mapreduce.output.basename": cf, + "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat", + "mapreduce.job.output.key.class": "java.util.Map", + "mapreduce.job.output.value.class": "java.util.List"} + key = {"user_id": int(sys.argv[4])} sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset( conf=conf, keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index befacee0de..3b16010f1c 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -51,7 +51,8 @@ if __name__ == "__main__": Usage: hbase_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_inputformat.py
Assumes you have some data in HBase already, running on , in
""" exit(-1) @@ -61,12 +62,15 @@ if __name__ == "__main__": sc = SparkContext(appName="HBaseInputFormat") conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" + valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" + hbase_rdd = sc.newAPIHadoopRDD( "org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", - keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", - valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter", + keyConverter=keyConv, + valueConverter=valueConv, conf=conf) output = hbase_rdd.collect() for (k, v) in output: diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py index 49bbc5aebd..abb425b1f8 100644 --- a/examples/src/main/python/hbase_outputformat.py +++ b/examples/src/main/python/hbase_outputformat.py @@ -44,8 +44,10 @@ if __name__ == "__main__": Usage: hbase_outputformat
Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py - Assumes you have created
with column family in HBase running on already + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_outputformat.py + Assumes you have created
with column family in HBase + running on already """ exit(-1) @@ -55,13 +57,15 @@ if __name__ == "__main__": conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, - "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", - "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"} + "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", + "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} + keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" + valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=conf, - keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", - valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter") + keyConverter=keyConv, + valueConverter=valueConv) sc.stop() diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py index 6b16a56e44..4218eca822 100755 --- a/examples/src/main/python/mllib/correlations.py +++ b/examples/src/main/python/mllib/correlations.py @@ -28,7 +28,7 @@ from pyspark.mllib.util import MLUtils if __name__ == "__main__": - if len(sys.argv) not in [1,2]: + if len(sys.argv) not in [1, 2]: print >> sys.stderr, "Usage: correlations ()" exit(-1) sc = SparkContext(appName="PythonCorrelations") diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py index 6e4a4a0cb6..61ea4e06ec 100755 --- a/examples/src/main/python/mllib/decision_tree_runner.py +++ b/examples/src/main/python/mllib/decision_tree_runner.py @@ -21,7 +21,9 @@ Decision tree classification and regression using MLlib. This example requires NumPy (http://www.numpy.org/). """ -import numpy, os, sys +import numpy +import os +import sys from operator import add @@ -127,7 +129,7 @@ if __name__ == "__main__": (reindexedData, origToNewLabels) = reindexClassLabels(points) # Train a classifier. - categoricalFeaturesInfo={} # no categorical features + categoricalFeaturesInfo = {} # no categorical features model = DecisionTree.trainClassifier(reindexedData, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) # Print learned tree and stats. diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py index b388d8d83f..1e8892741e 100755 --- a/examples/src/main/python/mllib/random_rdd_generation.py +++ b/examples/src/main/python/mllib/random_rdd_generation.py @@ -32,8 +32,8 @@ if __name__ == "__main__": sc = SparkContext(appName="PythonRandomRDDGeneration") - numExamples = 10000 # number of examples to generate - fraction = 0.1 # fraction of data to sample + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample # Example: RandomRDDs.normalRDD normalRDD = RandomRDDs.normalRDD(sc, numExamples) @@ -45,7 +45,7 @@ if __name__ == "__main__": print # Example: RandomRDDs.normalVectorRDD - normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2) print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() print ' First 5 samples:' for sample in normalVectorRDD.take(5): diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py index ec64a5978c..92af3af5eb 100755 --- a/examples/src/main/python/mllib/sampled_rdds.py +++ b/examples/src/main/python/mllib/sampled_rdds.py @@ -36,7 +36,7 @@ if __name__ == "__main__": sc = SparkContext(appName="PythonSampledRDDs") - fraction = 0.1 # fraction of data to sample + fraction = 0.1 # fraction of data to sample examples = MLUtils.loadLibSVMFile(sc, datapath) numExamples = examples.count() @@ -49,9 +49,9 @@ if __name__ == "__main__": expectedSampleSize = int(numExamples * fraction) print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ % (fraction, expectedSampleSize) - sampledRDD = examples.sample(withReplacement = True, fraction = fraction) + sampledRDD = examples.sample(withReplacement=True, fraction=fraction) print ' RDD.sample(): sample has %d examples' % sampledRDD.count() - sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize) + sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize) print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) print @@ -66,7 +66,7 @@ if __name__ == "__main__": fractions = {} for k in keyCountsA.keys(): fractions[k] = fraction - sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions) + sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions) keyCountsB = sampledByKeyRDD.countByKey() sizeB = sum(keyCountsB.values()) print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index fc37459dc7..ee9036adfa 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -35,7 +35,7 @@ if __name__ == "__main__": y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 - count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) + count = sc.parallelize(xrange(1, n + 1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) sc.stop() -- cgit v1.2.3