From 9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Fri, 5 Sep 2014 23:08:54 -0700 Subject: [SPARK-3361] Expand PEP 8 checks to include EC2 script and Python examples This PR resolves [SPARK-3361](https://issues.apache.org/jira/browse/SPARK-3361) by expanding the PEP 8 checks to cover the remaining Python code base: * The EC2 script * All Python / PySpark examples Author: Nicholas Chammas Closes #2297 from nchammas/pep8-rulez and squashes the following commits: 1e5ac9a [Nicholas Chammas] PEP 8 fixes to Python examples c3dbeff [Nicholas Chammas] PEP 8 fixes to EC2 script 65ef6e8 [Nicholas Chammas] expand PEP 8 checks --- dev/lint-python | 5 +++-- ec2/spark_ec2.py | 20 +++++++++++++------ examples/src/main/python/avro_inputformat.py | 17 ++++++++++------ examples/src/main/python/cassandra_inputformat.py | 15 +++++++------- examples/src/main/python/cassandra_outputformat.py | 23 +++++++++++----------- examples/src/main/python/hbase_inputformat.py | 10 +++++++--- examples/src/main/python/hbase_outputformat.py | 18 ++++++++++------- examples/src/main/python/mllib/correlations.py | 2 +- .../src/main/python/mllib/decision_tree_runner.py | 6 ++++-- .../src/main/python/mllib/random_rdd_generation.py | 6 +++--- examples/src/main/python/mllib/sampled_rdds.py | 8 ++++---- examples/src/main/python/pi.py | 2 +- 12 files changed, 79 insertions(+), 53 deletions(-) diff --git a/dev/lint-python b/dev/lint-python index a1e890faa8..79bf70f0b8 100755 --- a/dev/lint-python +++ b/dev/lint-python @@ -30,6 +30,7 @@ cd $SPARK_ROOT_DIR #+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?)) PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py" PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py" +PEP8_PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/" curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH" curl_status=$? @@ -44,7 +45,7 @@ fi #+ first, but we do so so that the check status can #+ be output before the report, like with the #+ scalastyle and RAT checks. -python $PEP8_SCRIPT_PATH ./python/pyspark > "$PEP8_REPORT_PATH" +python $PEP8_SCRIPT_PATH $PEP8_PATHS_TO_CHECK > "$PEP8_REPORT_PATH" pep8_status=${PIPESTATUS[0]} #$? if [ $pep8_status -ne 0 ]; then @@ -54,7 +55,7 @@ else echo "PEP 8 checks passed." fi -rm -f "$PEP8_REPORT_PATH" +rm "$PEP8_REPORT_PATH" rm "$PEP8_SCRIPT_PATH" exit $pep8_status diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 1670faca4a..8ec88d95e3 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -41,6 +41,7 @@ from boto import ec2 # A URL prefix from which to fetch AMI information AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list" + class UsageError(Exception): pass @@ -342,7 +343,6 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) - additional_groups = [] if opts.additional_security_group: additional_groups = [sg @@ -363,7 +363,7 @@ def launch_cluster(conn, opts, cluster_name): for i in range(opts.ebs_vol_num): device = EBSBlockDeviceType() device.size = opts.ebs_vol_size - device.volume_type=opts.ebs_vol_type + device.volume_type = opts.ebs_vol_type device.delete_on_termination = True block_map["/dev/sd" + chr(ord('s') + i)] = device @@ -495,6 +495,7 @@ def launch_cluster(conn, opts, cluster_name): # Return all the instances return (master_nodes, slave_nodes) + def tag_instance(instance, name): for i in range(0, 5): try: @@ -507,9 +508,12 @@ def tag_instance(instance, name): # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters and slaves + + def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." - # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster. + # Search all the spot instance requests, and copy any tags from the spot + # instance request to the cluster. spot_instance_requests = conn.get_all_spot_instance_requests() for req in spot_instance_requests: if req.state != u'active': @@ -520,7 +524,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): for res in reservations: active = [i for i in res.instances if is_active(i)] for instance in active: - if (instance.tags.get(u'Name') == None): + if (instance.tags.get(u'Name') is None): tag_instance(instance, name) # Now proceed to detect master and slaves instances. reservations = conn.get_all_instances() @@ -540,13 +544,16 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): return (master_nodes, slave_nodes) else: if master_nodes == [] and slave_nodes != []: - print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master" + print >> sys.stderr, "ERROR: Could not find master in with name " + \ + cluster_name + "-master" else: print >> sys.stderr, "ERROR: Could not find any existing cluster" sys.exit(1) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. + + def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = master_nodes[0].public_dns_name if deploy_ssh_key: @@ -890,7 +897,8 @@ def real_main(): if opts.security_group_prefix is None: group_names = [cluster_name + "-master", cluster_name + "-slaves"] else: - group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"] + group_names = [opts.security_group_prefix + "-master", + opts.security_group_prefix + "-slaves"] attempt = 1 while attempt <= 3: diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py index e902ae2975..cfda8d8327 100644 --- a/examples/src/main/python/avro_inputformat.py +++ b/examples/src/main/python/avro_inputformat.py @@ -23,7 +23,8 @@ from pyspark import SparkContext Read data file users.avro in local Spark distro: $ cd $SPARK_HOME -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro {u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]} {u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []} @@ -40,7 +41,8 @@ $ cat examples/src/main/resources/user.avsc ] } -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro examples/src/main/resources/user.avsc {u'favorite_color': None, u'name': u'Alyssa'} {u'favorite_color': u'red', u'name': u'Ben'} @@ -51,8 +53,10 @@ if __name__ == "__main__": Usage: avro_inputformat [reader_schema_file] Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py [reader_schema_file] - Assumes you have Avro data stored in . Reader schema can be optionally specified in [reader_schema_file]. + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/avro_inputformat.py [reader_schema_file] + Assumes you have Avro data stored in . Reader schema can be optionally specified + in [reader_schema_file]. """ exit(-1) @@ -62,9 +66,10 @@ if __name__ == "__main__": conf = None if len(sys.argv) == 3: schema_rdd = sc.textFile(sys.argv[2], 1).collect() - conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)} + conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, schema_rdd)} - avro_rdd = sc.newAPIHadoopFile(path, + avro_rdd = sc.newAPIHadoopFile( + path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "org.apache.hadoop.io.NullWritable", diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py index e4a897f61e..05f34b74df 100644 --- a/examples/src/main/python/cassandra_inputformat.py +++ b/examples/src/main/python/cassandra_inputformat.py @@ -51,7 +51,8 @@ if __name__ == "__main__": Usage: cassandra_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_inputformat.py Assumes you have some data in Cassandra already, running on , in and """ exit(-1) @@ -61,12 +62,12 @@ if __name__ == "__main__": cf = sys.argv[3] sc = SparkContext(appName="CassandraInputFormat") - conf = {"cassandra.input.thrift.address":host, - "cassandra.input.thrift.port":"9160", - "cassandra.input.keyspace":keyspace, - "cassandra.input.columnfamily":cf, - "cassandra.input.partitioner.class":"Murmur3Partitioner", - "cassandra.input.page.row.size":"3"} + conf = {"cassandra.input.thrift.address": host, + "cassandra.input.thrift.port": "9160", + "cassandra.input.keyspace": keyspace, + "cassandra.input.columnfamily": cf, + "cassandra.input.partitioner.class": "Murmur3Partitioner", + "cassandra.input.page.row.size": "3"} cass_rdd = sc.newAPIHadoopRDD( "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", "java.util.Map", diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py index 836c35b5c6..d144539e58 100644 --- a/examples/src/main/python/cassandra_outputformat.py +++ b/examples/src/main/python/cassandra_outputformat.py @@ -50,7 +50,8 @@ if __name__ == "__main__": Usage: cassandra_outputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_outputformat.py Assumes you have created the following table in Cassandra already, running on , in . @@ -67,16 +68,16 @@ if __name__ == "__main__": cf = sys.argv[3] sc = SparkContext(appName="CassandraOutputFormat") - conf = {"cassandra.output.thrift.address":host, - "cassandra.output.thrift.port":"9160", - "cassandra.output.keyspace":keyspace, - "cassandra.output.partitioner.class":"Murmur3Partitioner", - "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", - "mapreduce.output.basename":cf, - "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat", - "mapreduce.job.output.key.class":"java.util.Map", - "mapreduce.job.output.value.class":"java.util.List"} - key = {"user_id" : int(sys.argv[4])} + conf = {"cassandra.output.thrift.address": host, + "cassandra.output.thrift.port": "9160", + "cassandra.output.keyspace": keyspace, + "cassandra.output.partitioner.class": "Murmur3Partitioner", + "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", + "mapreduce.output.basename": cf, + "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat", + "mapreduce.job.output.key.class": "java.util.Map", + "mapreduce.job.output.value.class": "java.util.List"} + key = {"user_id": int(sys.argv[4])} sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset( conf=conf, keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index befacee0de..3b16010f1c 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -51,7 +51,8 @@ if __name__ == "__main__": Usage: hbase_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_inputformat.py
Assumes you have some data in HBase already, running on , in
""" exit(-1) @@ -61,12 +62,15 @@ if __name__ == "__main__": sc = SparkContext(appName="HBaseInputFormat") conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" + valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" + hbase_rdd = sc.newAPIHadoopRDD( "org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", - keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", - valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter", + keyConverter=keyConv, + valueConverter=valueConv, conf=conf) output = hbase_rdd.collect() for (k, v) in output: diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py index 49bbc5aebd..abb425b1f8 100644 --- a/examples/src/main/python/hbase_outputformat.py +++ b/examples/src/main/python/hbase_outputformat.py @@ -44,8 +44,10 @@ if __name__ == "__main__": Usage: hbase_outputformat
Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py - Assumes you have created
with column family in HBase running on already + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_outputformat.py + Assumes you have created
with column family in HBase + running on already """ exit(-1) @@ -55,13 +57,15 @@ if __name__ == "__main__": conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, - "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", - "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"} + "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", + "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} + keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" + valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=conf, - keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", - valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter") + keyConverter=keyConv, + valueConverter=valueConv) sc.stop() diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py index 6b16a56e44..4218eca822 100755 --- a/examples/src/main/python/mllib/correlations.py +++ b/examples/src/main/python/mllib/correlations.py @@ -28,7 +28,7 @@ from pyspark.mllib.util import MLUtils if __name__ == "__main__": - if len(sys.argv) not in [1,2]: + if len(sys.argv) not in [1, 2]: print >> sys.stderr, "Usage: correlations ()" exit(-1) sc = SparkContext(appName="PythonCorrelations") diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py index 6e4a4a0cb6..61ea4e06ec 100755 --- a/examples/src/main/python/mllib/decision_tree_runner.py +++ b/examples/src/main/python/mllib/decision_tree_runner.py @@ -21,7 +21,9 @@ Decision tree classification and regression using MLlib. This example requires NumPy (http://www.numpy.org/). """ -import numpy, os, sys +import numpy +import os +import sys from operator import add @@ -127,7 +129,7 @@ if __name__ == "__main__": (reindexedData, origToNewLabels) = reindexClassLabels(points) # Train a classifier. - categoricalFeaturesInfo={} # no categorical features + categoricalFeaturesInfo = {} # no categorical features model = DecisionTree.trainClassifier(reindexedData, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) # Print learned tree and stats. diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py index b388d8d83f..1e8892741e 100755 --- a/examples/src/main/python/mllib/random_rdd_generation.py +++ b/examples/src/main/python/mllib/random_rdd_generation.py @@ -32,8 +32,8 @@ if __name__ == "__main__": sc = SparkContext(appName="PythonRandomRDDGeneration") - numExamples = 10000 # number of examples to generate - fraction = 0.1 # fraction of data to sample + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample # Example: RandomRDDs.normalRDD normalRDD = RandomRDDs.normalRDD(sc, numExamples) @@ -45,7 +45,7 @@ if __name__ == "__main__": print # Example: RandomRDDs.normalVectorRDD - normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2) print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() print ' First 5 samples:' for sample in normalVectorRDD.take(5): diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py index ec64a5978c..92af3af5eb 100755 --- a/examples/src/main/python/mllib/sampled_rdds.py +++ b/examples/src/main/python/mllib/sampled_rdds.py @@ -36,7 +36,7 @@ if __name__ == "__main__": sc = SparkContext(appName="PythonSampledRDDs") - fraction = 0.1 # fraction of data to sample + fraction = 0.1 # fraction of data to sample examples = MLUtils.loadLibSVMFile(sc, datapath) numExamples = examples.count() @@ -49,9 +49,9 @@ if __name__ == "__main__": expectedSampleSize = int(numExamples * fraction) print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ % (fraction, expectedSampleSize) - sampledRDD = examples.sample(withReplacement = True, fraction = fraction) + sampledRDD = examples.sample(withReplacement=True, fraction=fraction) print ' RDD.sample(): sample has %d examples' % sampledRDD.count() - sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize) + sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize) print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) print @@ -66,7 +66,7 @@ if __name__ == "__main__": fractions = {} for k in keyCountsA.keys(): fractions[k] = fraction - sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions) + sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions) keyCountsB = sampledByKeyRDD.countByKey() sizeB = sum(keyCountsB.values()) print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index fc37459dc7..ee9036adfa 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -35,7 +35,7 @@ if __name__ == "__main__": y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 - count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) + count = sc.parallelize(xrange(1, n + 1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) sc.stop() -- cgit v1.2.3