aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-08-06 12:58:24 -0700
committerReynold Xin <rxin@apache.org>2014-08-06 12:58:24 -0700
commitd614967b0bad1e6c5277d612602ec0a653a00258 (patch)
tree8df1a52cbe074af4f928c0ac8f08a63075882d0b /python/pyspark
parenta6cd31108f0d73ce6823daafe8447677e03cfd13 (diff)
downloadspark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.gz
spark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.bz2
spark-d614967b0bad1e6c5277d612602ec0a653a00258.zip
[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that. Notes: * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server. * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request. * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete. * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1744 from nchammas/master and squashes the following commits: 274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes 983d963 [nchammas] Merge pull request #5 from apache/master 1db5314 [nchammas] Merge pull request #4 from apache/master 0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing 6db9a44 [nchammas] Merge pull request #3 from apache/master 7b4750e [Nicholas Chammas] merge upstream changes 91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks 44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes 9da347f [nchammas] Merge pull request #2 from apache/master aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections 21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8 6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes fe57ed0 [Nicholas Chammas] removing merge conflict backups 9c01d4c [nchammas] Merge pull request #1 from apache/master 9a66cb0 [Nicholas Chammas] resolving merge conflicts a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status 723ed39 [Nicholas Chammas] always delete the report file 0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests 12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter 61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter 75ad552 [Nicholas Chammas] make check output style consistent
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/accumulators.py7
-rw-r--r--python/pyspark/broadcast.py1
-rw-r--r--python/pyspark/conf.py1
-rw-r--r--python/pyspark/context.py25
-rw-r--r--python/pyspark/daemon.py5
-rw-r--r--python/pyspark/files.py1
-rw-r--r--python/pyspark/java_gateway.py1
-rw-r--r--python/pyspark/mllib/_common.py5
-rw-r--r--python/pyspark/mllib/classification.py8
-rw-r--r--python/pyspark/mllib/clustering.py3
-rw-r--r--python/pyspark/mllib/linalg.py2
-rw-r--r--python/pyspark/mllib/random.py14
-rw-r--r--python/pyspark/mllib/recommendation.py2
-rw-r--r--python/pyspark/mllib/regression.py12
-rw-r--r--python/pyspark/mllib/stat.py1
-rw-r--r--python/pyspark/mllib/tests.py11
-rw-r--r--python/pyspark/mllib/tree.py4
-rw-r--r--python/pyspark/mllib/util.py1
-rw-r--r--python/pyspark/rdd.py22
-rw-r--r--python/pyspark/rddsampler.py4
-rw-r--r--python/pyspark/resultiterable.py2
-rw-r--r--python/pyspark/serializers.py21
-rw-r--r--python/pyspark/shuffle.py20
-rw-r--r--python/pyspark/sql.py66
-rw-r--r--python/pyspark/storagelevel.py1
-rw-r--r--python/pyspark/tests.py143
26 files changed, 249 insertions, 134 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 45d36e5d0e..f133cf6f7b 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param):
class Accumulator(object):
+
"""
A shared variable that can be accumulated, i.e., has a commutative and associative "add"
operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
@@ -166,6 +167,7 @@ class Accumulator(object):
class AccumulatorParam(object):
+
"""
Helper object that defines how to accumulate values of a given type.
"""
@@ -186,6 +188,7 @@ class AccumulatorParam(object):
class AddingAccumulatorParam(AccumulatorParam):
+
"""
An AccumulatorParam that uses the + operators to add values. Designed for simple types
such as integers, floats, and lists. Requires the zero value for the underlying type
@@ -210,6 +213,7 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
+
"""
This handler will keep polling updates from the same socket until the
server is shutdown.
@@ -228,7 +232,9 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
# Write a byte in acknowledgement
self.wfile.write(struct.pack("!b", 1))
+
class AccumulatorServer(SocketServer.TCPServer):
+
"""
A simple TCP server that intercepts shutdown() in order to interrupt
our continuous polling on the handler.
@@ -239,6 +245,7 @@ class AccumulatorServer(SocketServer.TCPServer):
self.server_shutdown = True
SocketServer.TCPServer.shutdown(self)
+
def _start_update_server():
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 43f40f8783..f3e64989ed 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -45,6 +45,7 @@ def _from_id(bid):
class Broadcast(object):
+
"""
A broadcast variable created with
L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index b4c82f519b..fb716f6753 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -56,6 +56,7 @@ spark.home=/path
class SparkConf(object):
+
"""
Configuration for a Spark application. Used to set various Spark
parameters as key-value pairs.
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 2e80eb50f2..4001ecab5e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -47,6 +47,7 @@ DEFAULT_CONFIGS = {
class SparkContext(object):
+
"""
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
@@ -213,7 +214,7 @@ class SparkContext(object):
if instance:
if (SparkContext._active_spark_context and
- SparkContext._active_spark_context != instance):
+ SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
@@ -406,7 +407,7 @@ class SparkContext(object):
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
- keyConverter, valueConverter, minSplits, batchSize)
+ keyConverter, valueConverter, minSplits, batchSize)
return RDD(jrdd, self, ser)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
@@ -437,7 +438,8 @@ class SparkContext(object):
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf, batchSize)
+ valueClass, keyConverter, valueConverter,
+ jconf, batchSize)
return RDD(jrdd, self, ser)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
@@ -465,7 +467,8 @@ class SparkContext(object):
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf, batchSize)
+ valueClass, keyConverter, valueConverter,
+ jconf, batchSize)
return RDD(jrdd, self, ser)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
@@ -496,7 +499,8 @@ class SparkContext(object):
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf, batchSize)
+ valueClass, keyConverter, valueConverter,
+ jconf, batchSize)
return RDD(jrdd, self, ser)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
@@ -523,8 +527,9 @@ class SparkContext(object):
jconf = self._dictToJavaMap(conf)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
- jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
- keyConverter, valueConverter, jconf, batchSize)
+ jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter,
+ jconf, batchSize)
return RDD(jrdd, self, ser)
def _checkpointFile(self, name, input_deserializer):
@@ -555,8 +560,7 @@ class SparkContext(object):
first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self._gateway._gateway_client)
- return RDD(self._jsc.union(first, rest), self,
- rdds[0]._jrdd_deserializer)
+ return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)
def broadcast(self, value):
"""
@@ -568,8 +572,7 @@ class SparkContext(object):
pickleSer = PickleSerializer()
pickled = pickleSer.dumps(value)
jbroadcast = self._jsc.broadcast(bytearray(pickled))
- return Broadcast(jbroadcast.id(), value, jbroadcast,
- self._pickled_broadcast_vars)
+ return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars)
def accumulator(self, value, accum_param=None):
"""
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index b00da833d0..e73538baf0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -43,7 +43,7 @@ def worker(sock):
"""
# Redirect stdout to stderr
os.dup2(2, 1)
- sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
+ sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
@@ -134,8 +134,7 @@ def manager():
try:
os.kill(worker_pid, signal.SIGKILL)
except OSError:
- pass # process already died
-
+ pass # process already died
if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 57ee14eeb7..331de9a9b2 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -19,6 +19,7 @@ import os
class SparkFiles(object):
+
"""
Resolves paths to files added through
L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2c129679f4..37386ab0d7 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -65,6 +65,7 @@ def launch_gateway():
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):
+
def __init__(self, stream):
Thread.__init__(self)
self.daemon = True
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 9c1565affb..db341da85f 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -72,9 +72,9 @@ except:
# Python interpreter must agree on what endian the machine is.
-DENSE_VECTOR_MAGIC = 1
+DENSE_VECTOR_MAGIC = 1
SPARSE_VECTOR_MAGIC = 2
-DENSE_MATRIX_MAGIC = 3
+DENSE_MATRIX_MAGIC = 3
LABELED_POINT_MAGIC = 4
@@ -443,6 +443,7 @@ def _serialize_rating(r):
class RatingDeserializer(Serializer):
+
def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 5ec1a8084d..ffdda7ee19 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -31,6 +31,7 @@ from math import exp, log
class LogisticRegressionModel(LinearModel):
+
"""A linear binary classification model derived from logistic regression.
>>> data = [
@@ -60,6 +61,7 @@ class LogisticRegressionModel(LinearModel):
>>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
True
"""
+
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = _dot(x, self._coeff) + self._intercept
@@ -72,6 +74,7 @@ class LogisticRegressionModel(LinearModel):
class LogisticRegressionWithSGD(object):
+
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=1.0, regType=None, intercept=False):
@@ -108,6 +111,7 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
+
"""A support vector machine.
>>> data = [
@@ -131,6 +135,7 @@ class SVMModel(LinearModel):
>>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
True
"""
+
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = _dot(x, self._coeff) + self._intercept
@@ -138,6 +143,7 @@ class SVMModel(LinearModel):
class SVMWithSGD(object):
+
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False):
@@ -173,6 +179,7 @@ class SVMWithSGD(object):
class NaiveBayesModel(object):
+
"""
Model for Naive Bayes classifiers.
@@ -213,6 +220,7 @@ class NaiveBayesModel(object):
class NaiveBayes(object):
+
@classmethod
def train(cls, data, lambda_=1.0):
"""
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index b380e8f6c8..a0630d1d5c 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector
class KMeansModel(object):
+
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
@@ -55,6 +56,7 @@ class KMeansModel(object):
>>> type(model.clusterCenters)
<type 'list'>
"""
+
def __init__(self, centers):
self.centers = centers
@@ -76,6 +78,7 @@ class KMeansModel(object):
class KMeans(object):
+
@classmethod
def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 54720c2324..9a239abfbb 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -27,6 +27,7 @@ from numpy import array, array_equal, ndarray, float64, int32
class SparseVector(object):
+
"""
A simple sparse vector class for passing data to MLlib. Users may
alternatively pass SciPy's {scipy.sparse} data types.
@@ -192,6 +193,7 @@ class SparseVector(object):
class Vectors(object):
+
"""
Factory methods for working with vectors. Note that dense vectors
are simply represented as NumPy array objects, so there is no need
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 36e710dbae..eb496688b6 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -24,7 +24,9 @@ from pyspark.rdd import RDD
from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector
from pyspark.serializers import NoOpSerializer
+
class RandomRDDGenerators:
+
"""
Generator methods for creating RDDs comprised of i.i.d samples from
some distribution.
@@ -53,7 +55,7 @@ class RandomRDDGenerators:
True
"""
jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
+ uniform = RDD(jrdd, sc, NoOpSerializer())
return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
@staticmethod
@@ -77,7 +79,7 @@ class RandomRDDGenerators:
True
"""
jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
+ normal = RDD(jrdd, sc, NoOpSerializer())
return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
@staticmethod
@@ -98,7 +100,7 @@ class RandomRDDGenerators:
True
"""
jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
+ poisson = RDD(jrdd, sc, NoOpSerializer())
return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
@staticmethod
@@ -118,7 +120,7 @@ class RandomRDDGenerators:
"""
jrdd = sc._jvm.PythonMLLibAPI() \
.uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
+ uniform = RDD(jrdd, sc, NoOpSerializer())
return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
@@ -138,7 +140,7 @@ class RandomRDDGenerators:
"""
jrdd = sc._jvm.PythonMLLibAPI() \
.normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
+ normal = RDD(jrdd, sc, NoOpSerializer())
return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
@@ -161,7 +163,7 @@ class RandomRDDGenerators:
"""
jrdd = sc._jvm.PythonMLLibAPI() \
.poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
+ poisson = RDD(jrdd, sc, NoOpSerializer())
return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 6c385042ff..e863fc249e 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -26,6 +26,7 @@ from pyspark.rdd import RDD
class MatrixFactorizationModel(object):
+
"""A matrix factorisation model trained by regularized alternating
least-squares.
@@ -58,6 +59,7 @@ class MatrixFactorizationModel(object):
class ALS(object):
+
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
sc = ratings.context
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 041b119269..d8792cf448 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector, Vectors
class LabeledPoint(object):
+
"""
The features and labels of a data point.
@@ -34,6 +35,7 @@ class LabeledPoint(object):
@param features: Vector of features for this point (NumPy array, list,
pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
"""
+
def __init__(self, label, features):
self.label = label
if (type(features) == ndarray or type(features) == SparseVector
@@ -49,7 +51,9 @@ class LabeledPoint(object):
class LinearModel(object):
+
"""A linear model that has a vector of coefficients and an intercept."""
+
def __init__(self, weights, intercept):
self._coeff = weights
self._intercept = intercept
@@ -64,6 +68,7 @@ class LinearModel(object):
class LinearRegressionModelBase(LinearModel):
+
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
@@ -72,6 +77,7 @@ class LinearRegressionModelBase(LinearModel):
>>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
True
"""
+
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
@@ -80,6 +86,7 @@ class LinearRegressionModelBase(LinearModel):
class LinearRegressionModel(LinearRegressionModelBase):
+
"""A linear regression model derived from a least-squares fit.
>>> from pyspark.mllib.regression import LabeledPoint
@@ -111,6 +118,7 @@ class LinearRegressionModel(LinearRegressionModelBase):
class LinearRegressionWithSGD(object):
+
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=1.0, regType=None, intercept=False):
@@ -146,6 +154,7 @@ class LinearRegressionWithSGD(object):
class LassoModel(LinearRegressionModelBase):
+
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
@@ -178,6 +187,7 @@ class LassoModel(LinearRegressionModelBase):
class LassoWithSGD(object):
+
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
miniBatchFraction=1.0, initialWeights=None):
@@ -189,6 +199,7 @@ class LassoWithSGD(object):
class RidgeRegressionModel(LinearRegressionModelBase):
+
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
@@ -221,6 +232,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
class RidgeRegressionWithSGD(object):
+
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
miniBatchFraction=1.0, initialWeights=None):
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 0a08a562d1..982906b9d0 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -24,6 +24,7 @@ from pyspark.mllib._common import \
_serialize_double, _serialize_double_vector, \
_deserialize_double, _deserialize_double_matrix
+
class Statistics(object):
@staticmethod
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 9d1e5be637..6f3ec8ac94 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -39,6 +39,7 @@ except:
class VectorTests(unittest.TestCase):
+
def test_serialize(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
@@ -81,6 +82,7 @@ class VectorTests(unittest.TestCase):
class ListTests(PySparkTestCase):
+
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
@@ -128,7 +130,7 @@ class ListTests(PySparkTestCase):
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
- categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
+ categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = \
DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
@@ -168,7 +170,7 @@ class ListTests(PySparkTestCase):
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
- categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
+ categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = \
DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
@@ -179,6 +181,7 @@ class ListTests(PySparkTestCase):
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
+
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
if SciPy is available.
@@ -276,7 +279,7 @@ class SciPyTests(PySparkTestCase):
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
- categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
+ categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
@@ -315,7 +318,7 @@ class SciPyTests(PySparkTestCase):
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
- categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
+ categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 1e0006df75..2518001ea0 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -25,7 +25,9 @@ from pyspark.mllib._common import \
from pyspark.mllib.regression import LabeledPoint
from pyspark.serializers import NoOpSerializer
+
class DecisionTreeModel(object):
+
"""
A decision tree model for classification or regression.
@@ -77,6 +79,7 @@ class DecisionTreeModel(object):
class DecisionTree(object):
+
"""
Learning algorithm for a decision tree model
for classification or regression.
@@ -174,7 +177,6 @@ class DecisionTree(object):
categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
-
@staticmethod
def train(data, algo, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins=100):
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 639cda6350..4962d05491 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -26,6 +26,7 @@ from pyspark.serializers import NoOpSerializer
class MLUtils:
+
"""
Helper methods to load, save and pre-process data used in MLlib.
"""
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 309f5a9b60..30b834d208 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -233,7 +233,7 @@ class RDD(object):
def _toPickleSerialization(self):
if (self._jrdd_deserializer == PickleSerializer() or
- self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+ self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
return self
else:
return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
@@ -1079,7 +1079,9 @@ class RDD(object):
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
- outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+ outputFormatClass,
+ keyClass, valueClass,
+ keyConverter, valueConverter, jconf)
def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
@@ -1125,8 +1127,10 @@ class RDD(object):
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
- outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
- jconf, compressionCodecClass)
+ outputFormatClass,
+ keyClass, valueClass,
+ keyConverter, valueConverter,
+ jconf, compressionCodecClass)
def saveAsSequenceFile(self, path, compressionCodecClass=None):
"""
@@ -1348,7 +1352,7 @@ class RDD(object):
outputSerializer = self.ctx._unbatched_serializer
limit = (_parse_memory(self.ctx._conf.get(
- "spark.python.worker.memory", "512m")) / 2)
+ "spark.python.worker.memory", "512m")) / 2)
def add_shuffle_key(split, iterator):
@@ -1430,12 +1434,12 @@ class RDD(object):
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = _parse_memory(self.ctx._conf.get(
- "spark.python.worker.memory", "512m"))
+ "spark.python.worker.memory", "512m"))
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combineLocally(iterator):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()
@@ -1444,7 +1448,7 @@ class RDD(object):
def _mergeCombiners(iterator):
merger = ExternalMerger(agg, memory, serializer) \
- if spill else InMemoryMerger(agg)
+ if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
return merger.iteritems()
@@ -1588,7 +1592,7 @@ class RDD(object):
"""
for fraction in fractions.values():
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
- return self.mapPartitionsWithIndex( \
+ return self.mapPartitionsWithIndex(
RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
def subtractByKey(self, other, numPartitions=None):
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 2df000fdb0..55e247da0e 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -20,6 +20,7 @@ import random
class RDDSamplerBase(object):
+
def __init__(self, withReplacement, seed=None):
try:
import numpy
@@ -95,6 +96,7 @@ class RDDSamplerBase(object):
class RDDSampler(RDDSamplerBase):
+
def __init__(self, withReplacement, fraction, seed=None):
RDDSamplerBase.__init__(self, withReplacement, seed)
self._fraction = fraction
@@ -113,7 +115,9 @@ class RDDSampler(RDDSamplerBase):
if self.getUniformSample(split) <= self._fraction:
yield obj
+
class RDDStratifiedSampler(RDDSamplerBase):
+
def __init__(self, withReplacement, fractions, seed=None):
RDDSamplerBase.__init__(self, withReplacement, seed)
self._fractions = fractions
diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py
index df34740fc8..ef04c82866 100644
--- a/python/pyspark/resultiterable.py
+++ b/python/pyspark/resultiterable.py
@@ -21,9 +21,11 @@ import collections
class ResultIterable(collections.Iterable):
+
"""
A special result iterable. This is used because the standard iterator can not be pickled
"""
+
def __init__(self, data):
self.data = data
self.index = 0
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index a10f85b55a..b35558db3e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -111,6 +111,7 @@ class Serializer(object):
class FramedSerializer(Serializer):
+
"""
Serializer that writes objects as a stream of (length, data) pairs,
where C{length} is a 32-bit integer and data is C{length} bytes.
@@ -162,6 +163,7 @@ class FramedSerializer(Serializer):
class BatchedSerializer(Serializer):
+
"""
Serializes a stream of objects in batches by calling its wrapped
Serializer with streams of objects.
@@ -207,6 +209,7 @@ class BatchedSerializer(Serializer):
class CartesianDeserializer(FramedSerializer):
+
"""
Deserializes the JavaRDD cartesian() of two PythonRDDs.
"""
@@ -240,6 +243,7 @@ class CartesianDeserializer(FramedSerializer):
class PairDeserializer(CartesianDeserializer):
+
"""
Deserializes the JavaRDD zip() of two PythonRDDs.
"""
@@ -289,6 +293,7 @@ def _hack_namedtuple(cls):
""" Make class generated by namedtuple picklable """
name = cls.__name__
fields = cls._fields
+
def __reduce__(self):
return (_restore, (name, fields, tuple(self)))
cls.__reduce__ = __reduce__
@@ -301,10 +306,11 @@ def _hijack_namedtuple():
if hasattr(collections.namedtuple, "__hijack"):
return
- global _old_namedtuple # or it will put in closure
+ global _old_namedtuple # or it will put in closure
+
def _copy_func(f):
return types.FunctionType(f.func_code, f.func_globals, f.func_name,
- f.func_defaults, f.func_closure)
+ f.func_defaults, f.func_closure)
_old_namedtuple = _copy_func(collections.namedtuple)
@@ -323,15 +329,16 @@ def _hijack_namedtuple():
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.iteritems():
if (type(o) is type and o.__base__ is tuple
- and hasattr(o, "_fields")
- and "__reduce__" not in o.__dict__):
- _hack_namedtuple(o) # hack inplace
+ and hasattr(o, "_fields")
+ and "__reduce__" not in o.__dict__):
+ _hack_namedtuple(o) # hack inplace
_hijack_namedtuple()
class PickleSerializer(FramedSerializer):
+
"""
Serializes objects using Python's cPickle serializer:
@@ -354,6 +361,7 @@ class CloudPickleSerializer(PickleSerializer):
class MarshalSerializer(FramedSerializer):
+
"""
Serializes objects using Python's Marshal serializer:
@@ -367,9 +375,11 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
+
"""
Choose marshal or cPickle as serialization protocol autumatically
"""
+
def __init__(self):
FramedSerializer.__init__(self)
self._type = None
@@ -394,6 +404,7 @@ class AutoSerializer(FramedSerializer):
class UTF8Deserializer(Serializer):
+
"""
Deserializes streams written by String.getBytes.
"""
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index e3923d1c36..2c68cd4921 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -45,7 +45,7 @@ except ImportError:
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
- "support with spilling")
+ "support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
@@ -141,7 +141,7 @@ class ExternalMerger(Merger):
This class works as follows:
- - It repeatedly combine the items and save them in one dict in
+ - It repeatedly combine the items and save them in one dict in
memory.
- When the used memory goes above memory limit, it will split
@@ -190,12 +190,12 @@ class ExternalMerger(Merger):
MAX_TOTAL_PARTITIONS = 4096
def __init__(self, aggregator, memory_limit=512, serializer=None,
- localdirs=None, scale=1, partitions=59, batch=1000):
+ localdirs=None, scale=1, partitions=59, batch=1000):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ BatchedSerializer(PickleSerializer(), 1024)
self.localdirs = localdirs or self._get_dirs()
# number of partitions when spill data into disks
self.partitions = partitions
@@ -341,7 +341,7 @@ class ExternalMerger(Merger):
self.pdata[i].clear()
self.spills += 1
- gc.collect() # release the memory as much as possible
+ gc.collect() # release the memory as much as possible
def iteritems(self):
""" Return all merged items as iterator """
@@ -370,8 +370,8 @@ class ExternalMerger(Merger):
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > hard_limit):
- self.data.clear() # will read from disk again
- gc.collect() # release the memory as much as possible
+ self.data.clear() # will read from disk again
+ gc.collect() # release the memory as much as possible
for v in self._recursive_merged_items(i):
yield v
return
@@ -409,9 +409,9 @@ class ExternalMerger(Merger):
for i in range(start, self.partitions):
subdirs = [os.path.join(d, "parts", str(i))
- for d in self.localdirs]
+ for d in self.localdirs]
m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
- subdirs, self.scale * self.partitions)
+ subdirs, self.scale * self.partitions)
m.pdata = [{} for _ in range(self.partitions)]
limit = self._next_limit()
@@ -419,7 +419,7 @@ class ExternalMerger(Merger):
path = self._get_spill_dir(j)
p = os.path.join(path, str(i))
m._partitioned_mergeCombiners(
- self.serializer.load_stream(open(p)))
+ self.serializer.load_stream(open(p)))
if get_used_memory() > limit:
m._spill()
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index adc56e7ec0..950e275adb 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -45,6 +45,7 @@ __all__ = [
class DataType(object):
+
"""Spark SQL DataType"""
def __repr__(self):
@@ -62,6 +63,7 @@ class DataType(object):
class PrimitiveTypeSingleton(type):
+
"""Metaclass for PrimitiveType"""
_instances = {}
@@ -73,6 +75,7 @@ class PrimitiveTypeSingleton(type):
class PrimitiveType(DataType):
+
"""Spark SQL PrimitiveType"""
__metaclass__ = PrimitiveTypeSingleton
@@ -83,6 +86,7 @@ class PrimitiveType(DataType):
class StringType(PrimitiveType):
+
"""Spark SQL StringType
The data type representing string values.
@@ -90,6 +94,7 @@ class StringType(PrimitiveType):
class BinaryType(PrimitiveType):
+
"""Spark SQL BinaryType
The data type representing bytearray values.
@@ -97,6 +102,7 @@ class BinaryType(PrimitiveType):
class BooleanType(PrimitiveType):
+
"""Spark SQL BooleanType
The data type representing bool values.
@@ -104,6 +110,7 @@ class BooleanType(PrimitiveType):
class TimestampType(PrimitiveType):
+
"""Spark SQL TimestampType
The data type representing datetime.datetime values.
@@ -111,6 +118,7 @@ class TimestampType(PrimitiveType):
class DecimalType(PrimitiveType):
+
"""Spark SQL DecimalType
The data type representing decimal.Decimal values.
@@ -118,6 +126,7 @@ class DecimalType(PrimitiveType):
class DoubleType(PrimitiveType):
+
"""Spark SQL DoubleType
The data type representing float values.
@@ -125,6 +134,7 @@ class DoubleType(PrimitiveType):
class FloatType(PrimitiveType):
+
"""Spark SQL FloatType
The data type representing single precision floating-point values.
@@ -132,6 +142,7 @@ class FloatType(PrimitiveType):
class ByteType(PrimitiveType):
+
"""Spark SQL ByteType
The data type representing int values with 1 singed byte.
@@ -139,6 +150,7 @@ class ByteType(PrimitiveType):
class IntegerType(PrimitiveType):
+
"""Spark SQL IntegerType
The data type representing int values.
@@ -146,6 +158,7 @@ class IntegerType(PrimitiveType):
class LongType(PrimitiveType):
+
"""Spark SQL LongType
The data type representing long values. If the any value is
@@ -155,6 +168,7 @@ class LongType(PrimitiveType):
class ShortType(PrimitiveType):
+
"""Spark SQL ShortType
The data type representing int values with 2 signed bytes.
@@ -162,6 +176,7 @@ class ShortType(PrimitiveType):
class ArrayType(DataType):
+
"""Spark SQL ArrayType
The data type representing list values. An ArrayType object
@@ -187,10 +202,11 @@ class ArrayType(DataType):
def __str__(self):
return "ArrayType(%s,%s)" % (self.elementType,
- str(self.containsNull).lower())
+ str(self.containsNull).lower())
class MapType(DataType):
+
"""Spark SQL MapType
The data type representing dict values. A MapType object comprises
@@ -226,10 +242,11 @@ class MapType(DataType):
def __repr__(self):
return "MapType(%s,%s,%s)" % (self.keyType, self.valueType,
- str(self.valueContainsNull).lower())
+ str(self.valueContainsNull).lower())
class StructField(DataType):
+
"""Spark SQL StructField
Represents a field in a StructType.
@@ -263,10 +280,11 @@ class StructField(DataType):
def __repr__(self):
return "StructField(%s,%s,%s)" % (self.name, self.dataType,
- str(self.nullable).lower())
+ str(self.nullable).lower())
class StructType(DataType):
+
"""Spark SQL StructType
The data type representing rows.
@@ -291,7 +309,7 @@ class StructType(DataType):
def __repr__(self):
return ("StructType(List(%s))" %
- ",".join(str(field) for field in self.fields))
+ ",".join(str(field) for field in self.fields))
def _parse_datatype_list(datatype_list_string):
@@ -319,7 +337,7 @@ def _parse_datatype_list(datatype_list_string):
_all_primitive_types = dict((k, v) for k, v in globals().iteritems()
- if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType)
+ if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType)
def _parse_datatype_string(datatype_string):
@@ -459,16 +477,16 @@ def _infer_schema(row):
items = sorted(row.items())
elif isinstance(row, tuple):
- if hasattr(row, "_fields"): # namedtuple
+ if hasattr(row, "_fields"): # namedtuple
items = zip(row._fields, tuple(row))
- elif hasattr(row, "__FIELDS__"): # Row
+ elif hasattr(row, "__FIELDS__"): # Row
items = zip(row.__FIELDS__, tuple(row))
elif all(isinstance(x, tuple) and len(x) == 2 for x in row):
items = row
else:
raise ValueError("Can't infer schema from tuple")
- elif hasattr(row, "__dict__"): # object
+ elif hasattr(row, "__dict__"): # object
items = sorted(row.__dict__.items())
else:
@@ -499,7 +517,7 @@ def _create_converter(obj, dataType):
conv = lambda o: tuple(o.get(n) for n in names)
elif isinstance(obj, tuple):
- if hasattr(obj, "_fields"): # namedtuple
+ if hasattr(obj, "_fields"): # namedtuple
conv = tuple
elif hasattr(obj, "__FIELDS__"):
conv = tuple
@@ -508,7 +526,7 @@ def _create_converter(obj, dataType):
else:
raise ValueError("unexpected tuple")
- elif hasattr(obj, "__dict__"): # object
+ elif hasattr(obj, "__dict__"): # object
conv = lambda o: [o.__dict__.get(n, None) for n in names]
nested = any(_has_struct(f.dataType) for f in dataType.fields)
@@ -660,7 +678,7 @@ def _infer_schema_type(obj, dataType):
assert len(fs) == len(obj), \
"Obj(%s) have different length with fields(%s)" % (obj, fs)
fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True)
- for o, f in zip(obj, fs)]
+ for o, f in zip(obj, fs)]
return StructType(fields)
else:
@@ -683,6 +701,7 @@ _acceptable_types = {
StructType: (tuple, list),
}
+
def _verify_type(obj, dataType):
"""
Verify the type of obj against dataType, raise an exception if
@@ -728,7 +747,7 @@ def _verify_type(obj, dataType):
elif isinstance(dataType, StructType):
if len(obj) != len(dataType.fields):
raise ValueError("Length of object (%d) does not match with"
- "length of fields (%d)" % (len(obj), len(dataType.fields)))
+ "length of fields (%d)" % (len(obj), len(dataType.fields)))
for v, f in zip(obj, dataType.fields):
_verify_type(v, f.dataType)
@@ -861,6 +880,7 @@ def _create_cls(dataType):
raise Exception("unexpected data type: %s" % dataType)
class Row(tuple):
+
""" Row in SchemaRDD """
__DATATYPE__ = dataType
__FIELDS__ = tuple(f.name for f in dataType.fields)
@@ -872,7 +892,7 @@ def _create_cls(dataType):
def __repr__(self):
# call collect __repr__ for nested objects
return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n))
- for n in self.__FIELDS__))
+ for n in self.__FIELDS__))
def __reduce__(self):
return (_restore_object, (self.__DATATYPE__, tuple(self)))
@@ -881,6 +901,7 @@ def _create_cls(dataType):
class SQLContext:
+
"""Main entry point for SparkSQL functionality.
A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as
@@ -960,7 +981,7 @@ class SQLContext:
env = MapConverter().convert(self._sc.environment,
self._sc._gateway._gateway_client)
includes = ListConverter().convert(self._sc._python_includes,
- self._sc._gateway._gateway_client)
+ self._sc._gateway._gateway_client)
self._ssql_ctx.registerPython(name,
bytearray(CloudPickleSerializer().dumps(command)),
env,
@@ -1012,7 +1033,7 @@ class SQLContext:
first = rdd.first()
if not first:
raise ValueError("The first row in RDD is empty, "
- "can not infer schema")
+ "can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated")
@@ -1287,6 +1308,7 @@ class SQLContext:
class HiveContext(SQLContext):
+
"""A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath.
@@ -1327,6 +1349,7 @@ class HiveContext(SQLContext):
class LocalHiveContext(HiveContext):
+
"""Starts up an instance of hive where metadata is stored locally.
An in-process metadata data is created with data stored in ./metadata.
@@ -1357,7 +1380,7 @@ class LocalHiveContext(HiveContext):
def __init__(self, sparkContext, sqlContext=None):
HiveContext.__init__(self, sparkContext, sqlContext)
warnings.warn("LocalHiveContext is deprecated. "
- "Use HiveContext instead.", DeprecationWarning)
+ "Use HiveContext instead.", DeprecationWarning)
def _get_hive_ctx(self):
return self._jvm.LocalHiveContext(self._jsc.sc())
@@ -1376,6 +1399,7 @@ def _create_row(fields, values):
class Row(tuple):
+
"""
A row in L{SchemaRDD}. The fields in it can be accessed like attributes.
@@ -1417,7 +1441,6 @@ class Row(tuple):
else:
raise ValueError("No args or kwargs")
-
# let obect acs like class
def __call__(self, *args):
"""create new Row object"""
@@ -1443,12 +1466,13 @@ class Row(tuple):
def __repr__(self):
if hasattr(self, "__FIELDS__"):
return "Row(%s)" % ", ".join("%s=%r" % (k, v)
- for k, v in zip(self.__FIELDS__, self))
+ for k, v in zip(self.__FIELDS__, self))
else:
return "<Row(%s)>" % ", ".join(self)
class SchemaRDD(RDD):
+
"""An RDD of L{Row} objects that has an associated schema.
The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can
@@ -1659,7 +1683,7 @@ class SchemaRDD(RDD):
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
else:
rdd = self._jschema_rdd.subtract(other._jschema_rdd,
- numPartitions)
+ numPartitions)
return SchemaRDD(rdd, self.sql_ctx)
else:
raise ValueError("Can only subtract another SchemaRDD")
@@ -1686,9 +1710,9 @@ def _test():
jsonStrings = [
'{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
- '"field6":[{"field7": "row2"}]}',
+ '"field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", '
- '"field3":{"field4":33, "field5": []}}'
+ '"field3":{"field4":33, "field5": []}}'
]
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 5d77a131f2..2aa0fb9d2c 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -19,6 +19,7 @@ __all__ = ["StorageLevel"]
class StorageLevel:
+
"""
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 4ac94ba729..88a61176e5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,53 +62,53 @@ class TestMerger(unittest.TestCase):
self.N = 1 << 16
self.l = [i for i in xrange(self.N)]
self.data = zip(self.l, self.l)
- self.agg = Aggregator(lambda x: [x],
- lambda x, y: x.append(y) or x,
- lambda x, y: x.extend(y) or x)
+ self.agg = Aggregator(lambda x: [x],
+ lambda x, y: x.append(y) or x,
+ lambda x, y: x.extend(y) or x)
def test_in_memory(self):
m = InMemoryMerger(self.agg)
m.mergeValues(self.data)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = InMemoryMerger(self.agg)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
def test_small_dataset(self):
m = ExternalMerger(self.agg, 1000)
m.mergeValues(self.data)
self.assertEqual(m.spills, 0)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = ExternalMerger(self.agg, 1000)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
self.assertEqual(m.spills, 0)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
def test_medium_dataset(self):
m = ExternalMerger(self.agg, 10)
m.mergeValues(self.data)
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = ExternalMerger(self.agg, 10)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3))
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)) * 3)
+ sum(xrange(self.N)) * 3)
def test_huge_dataset(self):
m = ExternalMerger(self.agg, 10)
m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10))
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)),
- self.N * 10)
+ self.N * 10)
m._cleanup()
@@ -188,6 +188,7 @@ class TestAddFile(PySparkTestCase):
log4j = self.sc._jvm.org.apache.log4j
old_level = log4j.LogManager.getRootLogger().getLevel()
log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
+
def func(x):
from userlibrary import UserClass
return UserClass().hello()
@@ -355,8 +356,8 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(doubles, ed)
bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.BytesWritable").collect())
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BytesWritable").collect())
ebs = [(1, bytearray('aa', 'utf-8')),
(1, bytearray('aa', 'utf-8')),
(2, bytearray('aa', 'utf-8')),
@@ -428,9 +429,9 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(clazz[0], ec)
unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
- "org.apache.hadoop.io.Text",
- "org.apache.spark.api.python.TestWritable",
- batchSize=1).collect())
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable",
+ batchSize=1).collect())
self.assertEqual(unbatched_clazz[0], ec)
def test_oldhadoop(self):
@@ -443,7 +444,7 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- oldconf = {"mapred.input.dir" : hellopath}
+ oldconf = {"mapred.input.dir": hellopath}
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
@@ -462,7 +463,7 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- newconf = {"mapred.input.dir" : hellopath}
+ newconf = {"mapred.input.dir": hellopath}
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
@@ -517,6 +518,7 @@ class TestInputFormat(PySparkTestCase):
(u'\x03', [2.0])]
self.assertEqual(maps, em)
+
class TestOutputFormat(PySparkTestCase):
def setUp(self):
@@ -574,8 +576,8 @@ class TestOutputFormat(PySparkTestCase):
def test_oldhadoop(self):
basepath = self.tempdir.name
dict_data = [(1, {}),
- (1, {"row1" : 1.0}),
- (2, {"row2" : 2.0})]
+ (1, {"row1": 1.0}),
+ (2, {"row2": 2.0})]
self.sc.parallelize(dict_data).saveAsHadoopFile(
basepath + "/oldhadoop/",
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
@@ -589,12 +591,13 @@ class TestOutputFormat(PySparkTestCase):
self.assertEqual(result, dict_data)
conf = {
- "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
- "mapred.output.dir" : basepath + "/olddataset/"}
+ "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.MapWritable",
+ "mapred.output.dir": basepath + "/olddataset/"
+ }
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
- input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+ input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
old_dataset = sorted(self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -622,14 +625,17 @@ class TestOutputFormat(PySparkTestCase):
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
self.assertEqual(result, array_data)
- conf = {"mapreduce.outputformat.class" :
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
- "mapred.output.dir" : basepath + "/newdataset/"}
- self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+ conf = {
+ "mapreduce.outputformat.class":
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
+ "mapred.output.dir": basepath + "/newdataset/"
+ }
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
+ conf,
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
- input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+ input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -640,7 +646,7 @@ class TestOutputFormat(PySparkTestCase):
def test_newolderror(self):
basepath = self.tempdir.name
- rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
basepath + "/newolderror/saveAsHadoopFile/",
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
@@ -650,7 +656,7 @@ class TestOutputFormat(PySparkTestCase):
def test_bad_inputs(self):
basepath = self.tempdir.name
- rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
basepath + "/badinputs/saveAsHadoopFile/",
"org.apache.hadoop.mapred.NotValidOutputFormat"))
@@ -685,30 +691,32 @@ class TestOutputFormat(PySparkTestCase):
result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
self.assertEqual(result1, data)
- rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
- "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ rdd.saveAsHadoopFile(
+ basepath + "/reserialize/hadoop",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat")
result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
self.assertEqual(result2, data)
- rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ rdd.saveAsNewAPIHadoopFile(
+ basepath + "/reserialize/newhadoop",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
self.assertEqual(result3, data)
conf4 = {
- "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.dir" : basepath + "/reserialize/dataset"}
+ "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir": basepath + "/reserialize/dataset"}
rdd.saveAsHadoopDataset(conf4)
result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
self.assertEqual(result4, data)
- conf5 = {"mapreduce.outputformat.class" :
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+ conf5 = {"mapreduce.outputformat.class":
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir": basepath + "/reserialize/newdataset"}
rdd.saveAsNewAPIHadoopDataset(conf5)
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
@@ -719,25 +727,28 @@ class TestOutputFormat(PySparkTestCase):
self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
basepath + "/unbatched/")
- unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+ unbatched_sequence = sorted(self.sc.sequenceFile(
+ basepath + "/unbatched/",
batchSize=1).collect())
self.assertEqual(unbatched_sequence, ei)
- unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+ unbatched_hadoopFile = sorted(self.sc.hadoopFile(
+ basepath + "/unbatched/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_hadoopFile, ei)
- unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+ unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/unbatched/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_newAPIHadoopFile, ei)
- oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -746,7 +757,7 @@ class TestOutputFormat(PySparkTestCase):
batchSize=1).collect())
self.assertEqual(unbatched_hadoopRDD, ei)
- newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ newconf = {"mapred.input.dir": basepath + "/unbatched/"}
unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -763,7 +774,9 @@ class TestOutputFormat(PySparkTestCase):
self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
basepath + "/malformed/sequence"))
+
class TestDaemon(unittest.TestCase):
+
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM
sock = socket(AF_INET, SOCK_STREAM)
@@ -810,12 +823,15 @@ class TestDaemon(unittest.TestCase):
class TestWorker(PySparkTestCase):
+
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
path = temp.name
+
def sleep(x):
- import os, time
+ import os
+ import time
with open(path, 'w') as f:
f.write("%d %d" % (os.getppid(), os.getpid()))
time.sleep(100)
@@ -845,7 +861,7 @@ class TestWorker(PySparkTestCase):
os.kill(worker_pid, 0)
time.sleep(0.1)
except OSError:
- break # worker was killed
+ break # worker was killed
else:
self.fail("worker has not been killed after 5 seconds")
@@ -855,12 +871,13 @@ class TestWorker(PySparkTestCase):
self.fail("daemon had been killed")
def test_fd_leak(self):
- N = 1100 # fd limit is 1024 by default
+ N = 1100 # fd limit is 1024 by default
rdd = self.sc.parallelize(range(N), N)
self.assertEquals(N, rdd.count())
class TestSparkSubmit(unittest.TestCase):
+
def setUp(self):
self.programDir = tempfile.mkdtemp()
self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
@@ -953,9 +970,9 @@ class TestSparkSubmit(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
- proc = subprocess.Popen(
- [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
- stdout=subprocess.PIPE)
+ proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
+ "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
@@ -981,6 +998,7 @@ class TestSparkSubmit(unittest.TestCase):
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
+
"""General PySpark tests that depend on scipy """
def test_serialize(self):
@@ -993,15 +1011,16 @@ class SciPyTests(PySparkTestCase):
@unittest.skipIf(not _have_numpy, "NumPy not installed")
class NumPyTests(PySparkTestCase):
+
"""General PySpark tests that depend on numpy """
def test_statcounter_array(self):
- x = self.sc.parallelize([np.array([1.0,1.0]), np.array([2.0,2.0]), np.array([3.0,3.0])])
+ x = self.sc.parallelize([np.array([1.0, 1.0]), np.array([2.0, 2.0]), np.array([3.0, 3.0])])
s = x.stats()
- self.assertSequenceEqual([2.0,2.0], s.mean().tolist())
- self.assertSequenceEqual([1.0,1.0], s.min().tolist())
- self.assertSequenceEqual([3.0,3.0], s.max().tolist())
- self.assertSequenceEqual([1.0,1.0], s.sampleStdev().tolist())
+ self.assertSequenceEqual([2.0, 2.0], s.mean().tolist())
+ self.assertSequenceEqual([1.0, 1.0], s.min().tolist())
+ self.assertSequenceEqual([3.0, 3.0], s.max().tolist())
+ self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist())
if __name__ == "__main__":