aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-09-05 23:08:54 -0700
committerReynold Xin <rxin@apache.org>2014-09-05 23:08:54 -0700
commit9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40 (patch)
tree53000806a143eac041be4ad0f84a137f93e43bd3
parent19f61c165932059e7ce156da2c71429fa8dc27f0 (diff)
downloadspark-9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40.tar.gz
spark-9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40.tar.bz2
spark-9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40.zip
[SPARK-3361] Expand PEP 8 checks to include EC2 script and Python examples
This PR resolves [SPARK-3361](https://issues.apache.org/jira/browse/SPARK-3361) by expanding the PEP 8 checks to cover the remaining Python code base: * The EC2 script * All Python / PySpark examples Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #2297 from nchammas/pep8-rulez and squashes the following commits: 1e5ac9a [Nicholas Chammas] PEP 8 fixes to Python examples c3dbeff [Nicholas Chammas] PEP 8 fixes to EC2 script 65ef6e8 [Nicholas Chammas] expand PEP 8 checks
-rwxr-xr-xdev/lint-python5
-rwxr-xr-xec2/spark_ec2.py20
-rw-r--r--examples/src/main/python/avro_inputformat.py17
-rw-r--r--examples/src/main/python/cassandra_inputformat.py15
-rw-r--r--examples/src/main/python/cassandra_outputformat.py23
-rw-r--r--examples/src/main/python/hbase_inputformat.py10
-rw-r--r--examples/src/main/python/hbase_outputformat.py18
-rwxr-xr-xexamples/src/main/python/mllib/correlations.py2
-rwxr-xr-xexamples/src/main/python/mllib/decision_tree_runner.py6
-rwxr-xr-xexamples/src/main/python/mllib/random_rdd_generation.py6
-rwxr-xr-xexamples/src/main/python/mllib/sampled_rdds.py8
-rwxr-xr-xexamples/src/main/python/pi.py2
12 files changed, 79 insertions, 53 deletions
diff --git a/dev/lint-python b/dev/lint-python
index a1e890faa8..79bf70f0b8 100755
--- a/dev/lint-python
+++ b/dev/lint-python
@@ -30,6 +30,7 @@ cd $SPARK_ROOT_DIR
#+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?))
PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py"
PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py"
+PEP8_PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/"
curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH"
curl_status=$?
@@ -44,7 +45,7 @@ fi
#+ first, but we do so so that the check status can
#+ be output before the report, like with the
#+ scalastyle and RAT checks.
-python $PEP8_SCRIPT_PATH ./python/pyspark > "$PEP8_REPORT_PATH"
+python $PEP8_SCRIPT_PATH $PEP8_PATHS_TO_CHECK > "$PEP8_REPORT_PATH"
pep8_status=${PIPESTATUS[0]} #$?
if [ $pep8_status -ne 0 ]; then
@@ -54,7 +55,7 @@ else
echo "PEP 8 checks passed."
fi
-rm -f "$PEP8_REPORT_PATH"
+rm "$PEP8_REPORT_PATH"
rm "$PEP8_SCRIPT_PATH"
exit $pep8_status
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 1670faca4a..8ec88d95e3 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -41,6 +41,7 @@ from boto import ec2
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
+
class UsageError(Exception):
pass
@@ -342,7 +343,6 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ami is None:
opts.ami = get_spark_ami(opts)
-
additional_groups = []
if opts.additional_security_group:
additional_groups = [sg
@@ -363,7 +363,7 @@ def launch_cluster(conn, opts, cluster_name):
for i in range(opts.ebs_vol_num):
device = EBSBlockDeviceType()
device.size = opts.ebs_vol_size
- device.volume_type=opts.ebs_vol_type
+ device.volume_type = opts.ebs_vol_type
device.delete_on_termination = True
block_map["/dev/sd" + chr(ord('s') + i)] = device
@@ -495,6 +495,7 @@ def launch_cluster(conn, opts, cluster_name):
# Return all the instances
return (master_nodes, slave_nodes)
+
def tag_instance(instance, name):
for i in range(0, 5):
try:
@@ -507,9 +508,12 @@ def tag_instance(instance, name):
# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
+
+
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
- # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
+ # Search all the spot instance requests, and copy any tags from the spot
+ # instance request to the cluster.
spot_instance_requests = conn.get_all_spot_instance_requests()
for req in spot_instance_requests:
if req.state != u'active':
@@ -520,7 +524,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for instance in active:
- if (instance.tags.get(u'Name') == None):
+ if (instance.tags.get(u'Name') is None):
tag_instance(instance, name)
# Now proceed to detect master and slaves instances.
reservations = conn.get_all_instances()
@@ -540,13 +544,16 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master"
+ print >> sys.stderr, "ERROR: Could not find master in with name " + \
+ cluster_name + "-master"
else:
print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
+
+
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
@@ -890,7 +897,8 @@ def real_main():
if opts.security_group_prefix is None:
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
else:
- group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"]
+ group_names = [opts.security_group_prefix + "-master",
+ opts.security_group_prefix + "-slaves"]
attempt = 1
while attempt <= 3:
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
index e902ae2975..cfda8d8327 100644
--- a/examples/src/main/python/avro_inputformat.py
+++ b/examples/src/main/python/avro_inputformat.py
@@ -23,7 +23,8 @@ from pyspark import SparkContext
Read data file users.avro in local Spark distro:
$ cd $SPARK_HOME
-$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+> ./examples/src/main/python/avro_inputformat.py \
> examples/src/main/resources/users.avro
{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
@@ -40,7 +41,8 @@ $ cat examples/src/main/resources/user.avsc
]
}
-$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+> ./examples/src/main/python/avro_inputformat.py \
> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
{u'favorite_color': None, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'name': u'Ben'}
@@ -51,8 +53,10 @@ if __name__ == "__main__":
Usage: avro_inputformat <data_file> [reader_schema_file]
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
- Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
+ Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified
+ in [reader_schema_file].
"""
exit(-1)
@@ -62,9 +66,10 @@ if __name__ == "__main__":
conf = None
if len(sys.argv) == 3:
schema_rdd = sc.textFile(sys.argv[2], 1).collect()
- conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
+ conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, schema_rdd)}
- avro_rdd = sc.newAPIHadoopFile(path,
+ avro_rdd = sc.newAPIHadoopFile(
+ path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index e4a897f61e..05f34b74df 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -51,7 +51,8 @@ if __name__ == "__main__":
Usage: cassandra_inputformat <host> <keyspace> <cf>
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
"""
exit(-1)
@@ -61,12 +62,12 @@ if __name__ == "__main__":
cf = sys.argv[3]
sc = SparkContext(appName="CassandraInputFormat")
- conf = {"cassandra.input.thrift.address":host,
- "cassandra.input.thrift.port":"9160",
- "cassandra.input.keyspace":keyspace,
- "cassandra.input.columnfamily":cf,
- "cassandra.input.partitioner.class":"Murmur3Partitioner",
- "cassandra.input.page.row.size":"3"}
+ conf = {"cassandra.input.thrift.address": host,
+ "cassandra.input.thrift.port": "9160",
+ "cassandra.input.keyspace": keyspace,
+ "cassandra.input.columnfamily": cf,
+ "cassandra.input.partitioner.class": "Murmur3Partitioner",
+ "cassandra.input.page.row.size": "3"}
cass_rdd = sc.newAPIHadoopRDD(
"org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
"java.util.Map",
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index 836c35b5c6..d144539e58 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -50,7 +50,8 @@ if __name__ == "__main__":
Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/cassandra_outputformat.py <args>
Assumes you have created the following table <cf> in Cassandra already,
running on <host>, in <keyspace>.
@@ -67,16 +68,16 @@ if __name__ == "__main__":
cf = sys.argv[3]
sc = SparkContext(appName="CassandraOutputFormat")
- conf = {"cassandra.output.thrift.address":host,
- "cassandra.output.thrift.port":"9160",
- "cassandra.output.keyspace":keyspace,
- "cassandra.output.partitioner.class":"Murmur3Partitioner",
- "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
- "mapreduce.output.basename":cf,
- "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
- "mapreduce.job.output.key.class":"java.util.Map",
- "mapreduce.job.output.value.class":"java.util.List"}
- key = {"user_id" : int(sys.argv[4])}
+ conf = {"cassandra.output.thrift.address": host,
+ "cassandra.output.thrift.port": "9160",
+ "cassandra.output.keyspace": keyspace,
+ "cassandra.output.partitioner.class": "Murmur3Partitioner",
+ "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+ "mapreduce.output.basename": cf,
+ "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+ "mapreduce.job.output.key.class": "java.util.Map",
+ "mapreduce.job.output.value.class": "java.util.List"}
+ key = {"user_id": int(sys.argv[4])}
sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index befacee0de..3b16010f1c 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -51,7 +51,8 @@ if __name__ == "__main__":
Usage: hbase_inputformat <host> <table>
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py <host> <table>
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/hbase_inputformat.py <host> <table>
Assumes you have some data in HBase already, running on <host>, in <table>
"""
exit(-1)
@@ -61,12 +62,15 @@ if __name__ == "__main__":
sc = SparkContext(appName="HBaseInputFormat")
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
+ keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
+ valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
+
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
- keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
- valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
+ keyConverter=keyConv,
+ valueConverter=valueConv,
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index 49bbc5aebd..abb425b1f8 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -44,8 +44,10 @@ if __name__ == "__main__":
Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py <args>
- Assumes you have created <table> with column family <family> in HBase running on <host> already
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/hbase_outputformat.py <args>
+ Assumes you have created <table> with column family <family> in HBase
+ running on <host> already
"""
exit(-1)
@@ -55,13 +57,15 @@ if __name__ == "__main__":
conf = {"hbase.zookeeper.quorum": host,
"hbase.mapred.outputtable": table,
- "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
- "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
- "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"}
+ "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+ "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+ "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
+ keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
+ valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
conf=conf,
- keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
- valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
+ keyConverter=keyConv,
+ valueConverter=valueConv)
sc.stop()
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
index 6b16a56e44..4218eca822 100755
--- a/examples/src/main/python/mllib/correlations.py
+++ b/examples/src/main/python/mllib/correlations.py
@@ -28,7 +28,7 @@ from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
- if len(sys.argv) not in [1,2]:
+ if len(sys.argv) not in [1, 2]:
print >> sys.stderr, "Usage: correlations (<file>)"
exit(-1)
sc = SparkContext(appName="PythonCorrelations")
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index 6e4a4a0cb6..61ea4e06ec 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -21,7 +21,9 @@ Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
-import numpy, os, sys
+import numpy
+import os
+import sys
from operator import add
@@ -127,7 +129,7 @@ if __name__ == "__main__":
(reindexedData, origToNewLabels) = reindexClassLabels(points)
# Train a classifier.
- categoricalFeaturesInfo={} # no categorical features
+ categoricalFeaturesInfo = {} # no categorical features
model = DecisionTree.trainClassifier(reindexedData, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
# Print learned tree and stats.
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
index b388d8d83f..1e8892741e 100755
--- a/examples/src/main/python/mllib/random_rdd_generation.py
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -32,8 +32,8 @@ if __name__ == "__main__":
sc = SparkContext(appName="PythonRandomRDDGeneration")
- numExamples = 10000 # number of examples to generate
- fraction = 0.1 # fraction of data to sample
+ numExamples = 10000 # number of examples to generate
+ fraction = 0.1 # fraction of data to sample
# Example: RandomRDDs.normalRDD
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
@@ -45,7 +45,7 @@ if __name__ == "__main__":
print
# Example: RandomRDDs.normalVectorRDD
- normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
+ normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2)
print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
print ' First 5 samples:'
for sample in normalVectorRDD.take(5):
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
index ec64a5978c..92af3af5eb 100755
--- a/examples/src/main/python/mllib/sampled_rdds.py
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -36,7 +36,7 @@ if __name__ == "__main__":
sc = SparkContext(appName="PythonSampledRDDs")
- fraction = 0.1 # fraction of data to sample
+ fraction = 0.1 # fraction of data to sample
examples = MLUtils.loadLibSVMFile(sc, datapath)
numExamples = examples.count()
@@ -49,9 +49,9 @@ if __name__ == "__main__":
expectedSampleSize = int(numExamples * fraction)
print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
% (fraction, expectedSampleSize)
- sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
+ sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
- sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
+ sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
print
@@ -66,7 +66,7 @@ if __name__ == "__main__":
fractions = {}
for k in keyCountsA.keys():
fractions[k] = fraction
- sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)
+ sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
keyCountsB = sampledByKeyRDD.countByKey()
sizeB = sum(keyCountsB.values())
print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index fc37459dc7..ee9036adfa 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -35,7 +35,7 @@ if __name__ == "__main__":
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
- count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
+ count = sc.parallelize(xrange(1, n + 1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
sc.stop()