diff options
author | Nicholas Chammas <nicholas.chammas@gmail.com> | 2014-07-21 22:30:53 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-07-21 22:30:53 -0700 |
commit | 5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2 (patch) | |
tree | 61352aa954fb1fb2001586c8795a959421eb3c6f /python/pyspark/context.py | |
parent | c3462c65684885299cf037d56c88bd53c08c6348 (diff) | |
download | spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.gz spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.bz2 spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.zip |
[SPARK-2470] PEP8 fixes to PySpark
This pull request aims to resolve all outstanding PEP8 violations in PySpark.
Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Author: nchammas <nicholas.chammas@gmail.com>
Closes #1505 from nchammas/master and squashes the following commits:
98171af [Nicholas Chammas] [SPARK-2470] revert PEP 8 fixes to cloudpickle
cba7768 [Nicholas Chammas] [SPARK-2470] wrap expression list in parentheses
e178dbe [Nicholas Chammas] [SPARK-2470] style - change position of line break
9127d2b [Nicholas Chammas] [SPARK-2470] wrap expression lists in parentheses
22132a4 [Nicholas Chammas] [SPARK-2470] wrap conditionals in parentheses
24639bc [Nicholas Chammas] [SPARK-2470] fix whitespace for doctest
7d557b7 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to tests.py
8f8e4c0 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to storagelevel.py
b3b96cf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to statcounter.py
d644477 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to worker.py
aa3a7b6 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to sql.py
1916859 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to shell.py
95d1d95 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to serializers.py
a0fec2e [Nicholas Chammas] [SPARK-2470] PEP8 fixes to mllib
c85e1e5 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to join.py
d14f2f1 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to __init__.py
81fcb20 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to resultiterable.py
1bde265 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to java_gateway.py
7fc849c [Nicholas Chammas] [SPARK-2470] PEP8 fixes to daemon.py
ca2d28b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to context.py
f4e0039 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to conf.py
a6d5e4b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to cloudpickle.py
f0a7ebf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to rddsampler.py
4dd148f [nchammas] Merge pull request #5 from apache/master
f7e4581 [Nicholas Chammas] unrelated pep8 fix
a36eed0 [Nicholas Chammas] name ec2 instances and security groups consistently
de7292a [nchammas] Merge pull request #4 from apache/master
2e4fe00 [nchammas] Merge pull request #3 from apache/master
89fde08 [nchammas] Merge pull request #2 from apache/master
69f6e22 [Nicholas Chammas] PEP8 fixes
2627247 [Nicholas Chammas] broke up lines before they hit 100 chars
6544b7e [Nicholas Chammas] [SPARK-2065] give launched instances names
69da6cf [nchammas] Merge pull request #1 from apache/master
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 95c54e7a5a..e21be0e10a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -29,7 +29,7 @@ from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ - PairDeserializer + PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -50,12 +50,11 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH - + _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, - gateway=None): + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + gateway=None): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -138,8 +137,8 @@ class SparkContext(object): self._accumulatorServer = accumulators._start_update_server() (host, port) = self._accumulatorServer.server_address self._javaAccumulator = self._jsc.accumulator( - self._jvm.java.util.ArrayList(), - self._jvm.PythonAccumulatorParam(host, port)) + self._jvm.java.util.ArrayList(), + self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') @@ -165,7 +164,7 @@ class SparkContext(object): (dirname, filename) = os.path.split(path) self._python_includes.append(filename) sys.path.append(path) - if not dirname in sys.path: + if dirname not in sys.path: sys.path.append(dirname) # Create a temporary directory inside spark.local.dir: @@ -192,15 +191,19 @@ class SparkContext(object): SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: - if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: + if (SparkContext._active_spark_context and + SparkContext._active_spark_context != instance): currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite # Raise error if there is already a running Spark context - raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \ - " created by %s at %s:%s " \ - % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum)) + raise ValueError( + "Cannot run multiple SparkContexts at once; " + "existing SparkContext(app=%s, master=%s)" + " created by %s at %s:%s " + % (currentAppName, currentMaster, + callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance @@ -290,7 +293,7 @@ class SparkContext(object): Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. - + >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: ... testFile.write("Hello world!") @@ -584,11 +587,12 @@ class SparkContext(object): HTTP, HTTPS or FTP URI. """ self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): self._python_includes.append(filename) - sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode + # for tests in local mode + sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) def setCheckpointDir(self, dirName): """ @@ -649,9 +653,9 @@ class SparkContext(object): Cancelled If interruptOnCancel is set to true for the job group, then job cancellation will result - in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure - that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, - where HDFS may respond to Thread.interrupt() by marking nodes as dead. + in Thread.interrupt() being called on the job's executor threads. This is useful to help + ensure that the tasks are actually stopped in a timely manner, but is off by default due + to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. """ self._jsc.setJobGroup(groupId, description, interruptOnCancel) @@ -688,7 +692,7 @@ class SparkContext(object): """ self._jsc.sc().cancelAllJobs() - def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False): + def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): """ Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements. @@ -703,7 +707,7 @@ class SparkContext(object): >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True) [0, 1, 16, 25] """ - if partitions == None: + if partitions is None: partitions = range(rdd._jrdd.partitions().size()) javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) @@ -714,6 +718,7 @@ class SparkContext(object): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _test(): import atexit import doctest |