aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-07-21 22:30:53 -0700
committerReynold Xin <rxin@apache.org>2014-07-21 22:30:53 -0700
commit5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2 (patch)
tree61352aa954fb1fb2001586c8795a959421eb3c6f /python/pyspark/context.py
parentc3462c65684885299cf037d56c88bd53c08c6348 (diff)
downloadspark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.gz
spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.bz2
spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.zip
[SPARK-2470] PEP8 fixes to PySpark
This pull request aims to resolve all outstanding PEP8 violations in PySpark. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1505 from nchammas/master and squashes the following commits: 98171af [Nicholas Chammas] [SPARK-2470] revert PEP 8 fixes to cloudpickle cba7768 [Nicholas Chammas] [SPARK-2470] wrap expression list in parentheses e178dbe [Nicholas Chammas] [SPARK-2470] style - change position of line break 9127d2b [Nicholas Chammas] [SPARK-2470] wrap expression lists in parentheses 22132a4 [Nicholas Chammas] [SPARK-2470] wrap conditionals in parentheses 24639bc [Nicholas Chammas] [SPARK-2470] fix whitespace for doctest 7d557b7 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to tests.py 8f8e4c0 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to storagelevel.py b3b96cf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to statcounter.py d644477 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to worker.py aa3a7b6 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to sql.py 1916859 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to shell.py 95d1d95 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to serializers.py a0fec2e [Nicholas Chammas] [SPARK-2470] PEP8 fixes to mllib c85e1e5 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to join.py d14f2f1 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to __init__.py 81fcb20 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to resultiterable.py 1bde265 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to java_gateway.py 7fc849c [Nicholas Chammas] [SPARK-2470] PEP8 fixes to daemon.py ca2d28b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to context.py f4e0039 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to conf.py a6d5e4b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to cloudpickle.py f0a7ebf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to rddsampler.py 4dd148f [nchammas] Merge pull request #5 from apache/master f7e4581 [Nicholas Chammas] unrelated pep8 fix a36eed0 [Nicholas Chammas] name ec2 instances and security groups consistently de7292a [nchammas] Merge pull request #4 from apache/master 2e4fe00 [nchammas] Merge pull request #3 from apache/master 89fde08 [nchammas] Merge pull request #2 from apache/master 69f6e22 [Nicholas Chammas] PEP8 fixes 2627247 [Nicholas Chammas] broke up lines before they hit 100 chars 6544b7e [Nicholas Chammas] [SPARK-2065] give launched instances names 69da6cf [nchammas] Merge pull request #1 from apache/master
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py45
1 files changed, 25 insertions, 20 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 95c54e7a5a..e21be0e10a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
- PairDeserializer
+ PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
@@ -50,12 +50,11 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
- _python_includes = None # zip and egg files that need to be added to PYTHONPATH
-
+ _python_includes = None # zip and egg files that need to be added to PYTHONPATH
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
- environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
- gateway=None):
+ environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
+ gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
@@ -138,8 +137,8 @@ class SparkContext(object):
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
- self._jvm.java.util.ArrayList(),
- self._jvm.PythonAccumulatorParam(host, port))
+ self._jvm.java.util.ArrayList(),
+ self._jvm.PythonAccumulatorParam(host, port))
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
@@ -165,7 +164,7 @@ class SparkContext(object):
(dirname, filename) = os.path.split(path)
self._python_includes.append(filename)
sys.path.append(path)
- if not dirname in sys.path:
+ if dirname not in sys.path:
sys.path.append(dirname)
# Create a temporary directory inside spark.local.dir:
@@ -192,15 +191,19 @@ class SparkContext(object):
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
if instance:
- if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
+ if (SparkContext._active_spark_context and
+ SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
# Raise error if there is already a running Spark context
- raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
- " created by %s at %s:%s " \
- % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
+ raise ValueError(
+ "Cannot run multiple SparkContexts at once; "
+ "existing SparkContext(app=%s, master=%s)"
+ " created by %s at %s:%s "
+ % (currentAppName, currentMaster,
+ callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
@@ -290,7 +293,7 @@ class SparkContext(object):
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
-
+
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
@@ -584,11 +587,12 @@ class SparkContext(object):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
- (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
+ (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
- sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
+ # for tests in local mode
+ sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))
def setCheckpointDir(self, dirName):
"""
@@ -649,9 +653,9 @@ class SparkContext(object):
Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result
- in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
- that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
- where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ in Thread.interrupt() being called on the job's executor threads. This is useful to help
+ ensure that the tasks are actually stopped in a timely manner, but is off by default due
+ to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)
@@ -688,7 +692,7 @@ class SparkContext(object):
"""
self._jsc.sc().cancelAllJobs()
- def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
+ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
Executes the given partitionFunc on the specified set of partitions,
returning the result as an array of elements.
@@ -703,7 +707,7 @@ class SparkContext(object):
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
"""
- if partitions == None:
+ if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)
@@ -714,6 +718,7 @@ class SparkContext(object):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))
+
def _test():
import atexit
import doctest