aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql.py
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-07-21 22:30:53 -0700
committerReynold Xin <rxin@apache.org>2014-07-21 22:30:53 -0700
commit5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2 (patch)
tree61352aa954fb1fb2001586c8795a959421eb3c6f /python/pyspark/sql.py
parentc3462c65684885299cf037d56c88bd53c08c6348 (diff)
downloadspark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.gz
spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.tar.bz2
spark-5d16d5bbfd242c16ee0d6952c48dcd90651f8ae2.zip
[SPARK-2470] PEP8 fixes to PySpark
This pull request aims to resolve all outstanding PEP8 violations in PySpark. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1505 from nchammas/master and squashes the following commits: 98171af [Nicholas Chammas] [SPARK-2470] revert PEP 8 fixes to cloudpickle cba7768 [Nicholas Chammas] [SPARK-2470] wrap expression list in parentheses e178dbe [Nicholas Chammas] [SPARK-2470] style - change position of line break 9127d2b [Nicholas Chammas] [SPARK-2470] wrap expression lists in parentheses 22132a4 [Nicholas Chammas] [SPARK-2470] wrap conditionals in parentheses 24639bc [Nicholas Chammas] [SPARK-2470] fix whitespace for doctest 7d557b7 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to tests.py 8f8e4c0 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to storagelevel.py b3b96cf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to statcounter.py d644477 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to worker.py aa3a7b6 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to sql.py 1916859 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to shell.py 95d1d95 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to serializers.py a0fec2e [Nicholas Chammas] [SPARK-2470] PEP8 fixes to mllib c85e1e5 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to join.py d14f2f1 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to __init__.py 81fcb20 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to resultiterable.py 1bde265 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to java_gateway.py 7fc849c [Nicholas Chammas] [SPARK-2470] PEP8 fixes to daemon.py ca2d28b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to context.py f4e0039 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to conf.py a6d5e4b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to cloudpickle.py f0a7ebf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to rddsampler.py 4dd148f [nchammas] Merge pull request #5 from apache/master f7e4581 [Nicholas Chammas] unrelated pep8 fix a36eed0 [Nicholas Chammas] name ec2 instances and security groups consistently de7292a [nchammas] Merge pull request #4 from apache/master 2e4fe00 [nchammas] Merge pull request #3 from apache/master 89fde08 [nchammas] Merge pull request #2 from apache/master 69f6e22 [Nicholas Chammas] PEP8 fixes 2627247 [Nicholas Chammas] broke up lines before they hit 100 chars 6544b7e [Nicholas Chammas] [SPARK-2065] give launched instances names 69da6cf [nchammas] Merge pull request #1 from apache/master
Diffstat (limited to 'python/pyspark/sql.py')
-rw-r--r--python/pyspark/sql.py38
1 files changed, 21 insertions, 17 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index ffe177576f..cb83e89176 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -30,7 +30,7 @@ class SQLContext:
tables, execute SQL over tables, cache tables, and read parquet files.
"""
- def __init__(self, sparkContext, sqlContext = None):
+ def __init__(self, sparkContext, sqlContext=None):
"""Create a new SQLContext.
@param sparkContext: The SparkContext to wrap.
@@ -137,7 +137,6 @@ class SQLContext:
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
-
def jsonFile(self, path):
"""Loads a text file storing one JSON object per line,
returning the result as a L{SchemaRDD}.
@@ -234,8 +233,8 @@ class HiveContext(SQLContext):
self._scala_HiveContext = self._get_hive_ctx()
return self._scala_HiveContext
except Py4JError as e:
- raise Exception("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run " \
- "sbt/sbt assembly" , e)
+ raise Exception("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run "
+ "sbt/sbt assembly", e)
def _get_hive_ctx(self):
return self._jvm.HiveContext(self._jsc.sc())
@@ -377,7 +376,7 @@ class SchemaRDD(RDD):
"""
self._jschema_rdd.registerAsTable(name)
- def insertInto(self, tableName, overwrite = False):
+ def insertInto(self, tableName, overwrite=False):
"""Inserts the contents of this SchemaRDD into the specified table.
Optionally overwriting any existing data.
@@ -420,7 +419,7 @@ class SchemaRDD(RDD):
# in Java land in the javaToPython function. May require a custom
# pickle serializer in Pyrolite
return RDD(jrdd, self._sc, BatchedSerializer(
- PickleSerializer())).map(lambda d: Row(d))
+ PickleSerializer())).map(lambda d: Row(d))
# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class
@@ -483,6 +482,7 @@ class SchemaRDD(RDD):
else:
raise ValueError("Can only subtract another SchemaRDD")
+
def _test():
import doctest
from array import array
@@ -493,20 +493,25 @@ def _test():
sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
- globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
- {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
- jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
- '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}',
- '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}']
+ globs['rdd'] = sc.parallelize(
+ [{"field1": 1, "field2": "row1"},
+ {"field1": 2, "field2": "row2"},
+ {"field1": 3, "field2": "row3"}]
+ )
+ jsonStrings = [
+ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
+ '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}',
+ '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}'
+ ]
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
- {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
- {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
+ {"f1": array('i', [1, 2]), "f2": {"row1": 1.0}},
+ {"f1": array('i', [2, 3]), "f2": {"row2": 2.0}}])
globs['nestedRdd2'] = sc.parallelize([
- {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
- {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
- (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
+ {"f1": [[1, 2], [2, 3]], "f2": set([1, 2]), "f3": (1, 2)},
+ {"f1": [[2, 3], [3, 4]], "f2": set([2, 3]), "f3": (2, 3)}])
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
@@ -514,4 +519,3 @@ def _test():
if __name__ == "__main__":
_test()
-