diff options
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/__init__.py | 34 | ||||
-rw-r--r-- | python/pyspark/broadcast.py | 11 | ||||
-rw-r--r-- | python/pyspark/conf.py | 171 | ||||
-rw-r--r-- | python/pyspark/context.py | 59 | ||||
-rw-r--r-- | python/pyspark/java_gateway.py | 3 | ||||
-rw-r--r-- | python/pyspark/mllib/_common.py | 25 | ||||
-rw-r--r-- | python/pyspark/mllib/recommendation.py | 12 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 66 | ||||
-rw-r--r-- | python/pyspark/shell.py | 2 |
9 files changed, 339 insertions, 44 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 1f35f6f939..a51d5af79b 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -20,28 +20,34 @@ PySpark is the Python API for Spark. Public classes: - - L{SparkContext<pyspark.context.SparkContext>} - Main entry point for Spark functionality. - - L{RDD<pyspark.rdd.RDD>} - A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. - - L{Broadcast<pyspark.broadcast.Broadcast>} - A broadcast variable that gets reused across tasks. - - L{Accumulator<pyspark.accumulators.Accumulator>} - An "add-only" shared variable that tasks can only add values to. - - L{SparkFiles<pyspark.files.SparkFiles>} - Access files shipped with jobs. - - L{StorageLevel<pyspark.storagelevel.StorageLevel>} - Finer-grained cache persistence levels. + - L{SparkContext<pyspark.context.SparkContext>} + Main entry point for Spark functionality. + - L{RDD<pyspark.rdd.RDD>} + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. + - L{Broadcast<pyspark.broadcast.Broadcast>} + A broadcast variable that gets reused across tasks. + - L{Accumulator<pyspark.accumulators.Accumulator>} + An "add-only" shared variable that tasks can only add values to. + - L{SparkConf<pyspark.conf.SparkConf>} + For configuring Spark. + - L{SparkFiles<pyspark.files.SparkFiles>} + Access files shipped with jobs. + - L{StorageLevel<pyspark.storagelevel.StorageLevel>} + Finer-grained cache persistence levels. """ + + + import sys import os -sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg")) +sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip")) +from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel -__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"] +__all__ = ["SparkConf", "SparkContext", "RDD", "SparkFiles", "StorageLevel"] diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index dfdaba274f..43f40f8783 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -45,7 +45,18 @@ def _from_id(bid): class Broadcast(object): + """ + A broadcast variable created with + L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}. + Access its value through C{.value}. + """ + def __init__(self, bid, value, java_broadcast=None, pickle_registry=None): + """ + Should not be called directly by users -- use + L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>} + instead. + """ self.value = value self.bid = bid self._jbroadcast = java_broadcast diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py new file mode 100644 index 0000000000..d72aed6a30 --- /dev/null +++ b/python/pyspark/conf.py @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +>>> from pyspark.conf import SparkConf +>>> from pyspark.context import SparkContext +>>> conf = SparkConf() +>>> conf.setMaster("local").setAppName("My app") +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.master") +u'local' +>>> conf.get("spark.app.name") +u'My app' +>>> sc = SparkContext(conf=conf) +>>> sc.master +u'local' +>>> sc.appName +u'My app' +>>> sc.sparkHome == None +True + +>>> conf = SparkConf() +>>> conf.setSparkHome("/path") +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.home") +u'/path' +>>> conf.setExecutorEnv("VAR1", "value1") +<pyspark.conf.SparkConf object at ...> +>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.executorEnv.VAR1") +u'value1' +>>> print conf.toDebugString() +spark.executorEnv.VAR1=value1 +spark.executorEnv.VAR3=value3 +spark.executorEnv.VAR4=value4 +spark.home=/path +>>> sorted(conf.getAll(), key=lambda p: p[0]) +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +""" + + +class SparkConf(object): + """ + Configuration for a Spark application. Used to set various Spark + parameters as key-value pairs. + + Most of the time, you would create a SparkConf object with + C{SparkConf()}, which will load values from C{spark.*} Java system + properties and any C{spark.conf} on your Spark classpath. In this + case, system properties take priority over C{spark.conf}, and any + parameters you set directly on the C{SparkConf} object take priority + over both of those. + + For unit tests, you can also call C{SparkConf(false)} to skip + loading external settings and get the same configuration no matter + what is on the classpath. + + All setter methods in this class support chaining. For example, + you can write C{conf.setMaster("local").setAppName("My app")}. + + Note that once a SparkConf object is passed to Spark, it is cloned + and can no longer be modified by the user. + """ + + def __init__(self, loadDefaults=True, _jvm=None): + """ + Create a new Spark configuration. + + @param loadDefaults: whether to load values from Java system + properties and classpath (True by default) + @param _jvm: internal parameter used to pass a handle to the + Java VM; does not need to be set by users + """ + from pyspark.context import SparkContext + SparkContext._ensure_initialized() + _jvm = _jvm or SparkContext._jvm + self._jconf = _jvm.SparkConf(loadDefaults) + + def set(self, key, value): + """Set a configuration property.""" + self._jconf.set(key, unicode(value)) + return self + + def setMaster(self, value): + """Set master URL to connect to.""" + self._jconf.setMaster(value) + return self + + def setAppName(self, value): + """Set application name.""" + self._jconf.setAppName(value) + return self + + def setSparkHome(self, value): + """Set path where Spark is installed on worker nodes.""" + self._jconf.setSparkHome(value) + return self + + def setExecutorEnv(self, key=None, value=None, pairs=None): + """Set an environment variable to be passed to executors.""" + if (key != None and pairs != None) or (key == None and pairs == None): + raise Exception("Either pass one key-value pair or a list of pairs") + elif key != None: + self._jconf.setExecutorEnv(key, value) + elif pairs != None: + for (k, v) in pairs: + self._jconf.setExecutorEnv(k, v) + return self + + def setAll(self, pairs): + """ + Set multiple parameters, passed as a list of key-value pairs. + + @param pairs: list of key-value pairs to set + """ + for (k, v) in pairs: + self._jconf.set(k, v) + return self + + def get(self, key, defaultValue=None): + """Get the configured value for some key, or return a default otherwise.""" + if defaultValue == None: # Py4J doesn't call the right get() if we pass None + if not self._jconf.contains(key): + return None + return self._jconf.get(key) + else: + return self._jconf.get(key, defaultValue) + + def getAll(self): + """Get all values as a list of key-value pairs.""" + pairs = [] + for elem in self._jconf.getAll(): + pairs.append((elem._1(), elem._2())) + return pairs + + def contains(self, key): + """Does this configuration contain a given key?""" + return self._jconf.contains(key) + + def toDebugString(self): + """ + Returns a printable version of the configuration, as a list of + key=value pairs, one per line. + """ + return self._jconf.toDebugString() + + +def _test(): + import doctest + (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 108f36576a..f955aad7a4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer @@ -49,14 +50,15 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH - def __init__(self, master, jobName, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer()): + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None): """ - Create a new SparkContext. + Create a new SparkContext. At least the master and app name should be set, + either through the named parameters here or through C{conf}. @param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - @param jobName: A name for your job, to display on the cluster web UI + @param appName: A name for your job, to display on the cluster web UI. @param sparkHome: Location where Spark is installed on cluster nodes. @param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file @@ -67,6 +69,7 @@ class SparkContext(object): Java object. Set 1 to disable batching or -1 to use an unlimited batch size. @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. >>> from pyspark.context import SparkContext @@ -79,10 +82,8 @@ class SparkContext(object): """ SparkContext._ensure_initialized(self) - self.master = master - self.jobName = jobName - self.sparkHome = sparkHome or None # None becomes null in Py4J self.environment = environment or {} + self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer if batchSize == 1: @@ -91,10 +92,35 @@ class SparkContext(object): self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) + # Set any parameters passed directly to us on the conf + if master: + self._conf.setMaster(master) + if appName: + self._conf.setAppName(appName) + if sparkHome: + self._conf.setSparkHome(sparkHome) + if environment: + for key, value in environment.iteritems(): + self._conf.setExecutorEnv(key, value) + + # Check that we have at least the required parameters + if not self._conf.contains("spark.master"): + raise Exception("A master URL must be set in your configuration") + if not self._conf.contains("spark.app.name"): + raise Exception("An application name must be set in your configuration") + + # Read back our properties from the conf in case we loaded some of them from + # the classpath or an external config file + self.master = self._conf.get("spark.master") + self.appName = self._conf.get("spark.app.name") + self.sparkHome = self._conf.get("spark.home", None) + for (k, v) in self._conf.getAll(): + if k.startswith("spark.executorEnv."): + varName = k[len("spark.executorEnv."):] + self.environment[varName] = v + # Create the Java SparkContext through Py4J - empty_string_array = self._gateway.new_array(self._jvm.String, 0) - self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome, - empty_string_array) + self._jsc = self._jvm.JavaSparkContext(self._conf._jconf) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server @@ -105,6 +131,7 @@ class SparkContext(object): self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have # been pickled, so it can determine which Java broadcast objects to @@ -121,7 +148,7 @@ class SparkContext(object): self.addPyFile(path) # Create a temporary directory inside spark.local.dir: - local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir() + local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() @@ -131,8 +158,7 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeToFile = \ - SparkContext._jvm.PythonRDD.writeToFile + SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: @@ -143,8 +169,8 @@ class SparkContext(object): @classmethod def setSystemProperty(cls, key, value): """ - Set a system property, such as spark.executor.memory. This must be - invoked before instantiating SparkContext. + Set a Java system property, such as spark.executor.memory. This must + must be invoked before instantiating SparkContext. """ SparkContext._ensure_initialized() SparkContext._jvm.java.lang.System.setProperty(key, value) @@ -243,7 +269,8 @@ class SparkContext(object): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a C{Broadcast} + Broadcast a read-only variable to the cluster, returning a + L{Broadcast<pyspark.broadcast.Broadcast>} object for reading it in distributed functions. The variable will be sent to each cluster only once. """ diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index eb79135b9d..c15add5237 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -31,7 +31,7 @@ def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and SPARK_MEM settings from spark-env.sh on_windows = platform.system() == "Windows" - script = "spark-class.cmd" if on_windows else "spark-class" + script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] if not on_windows: @@ -60,6 +60,7 @@ def launch_gateway(): # Connect to the gateway gateway = JavaGateway(GatewayClient(port=port), auto_convert=False) # Import the classes used by PySpark + java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e74ba0fabc..769d88dfb9 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -18,6 +18,9 @@ from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape from pyspark import SparkContext +from pyspark.serializers import Serializer +import struct + # Double vector format: # # [8-byte 1] [8-byte length] [length*8 bytes of data] @@ -213,6 +216,28 @@ def _serialize_rating(r): intpart[0], intpart[1], doublepart[0] = r return ba +class RatingDeserializer(Serializer): + def loads(self, stream): + length = struct.unpack("!i", stream.read(4))[0] + ba = stream.read(length) + res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4) + return int(res[0]), int(res[1]), res[2] + + def load_stream(self, stream): + while True: + try: + yield self.loads(stream) + except struct.error: + return + except EOFError: + return + +def _serialize_tuple(t): + ba = bytearray(8) + intpart = ndarray(shape=[2], buffer=ba, dtype=int32) + intpart[0], intpart[1] = t + return ba + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 14d06cba21..0eeb5bb66b 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -20,7 +20,9 @@ from pyspark.mllib._common import \ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ - _get_initial_weights, _serialize_rating, _regression_train_wrapper + _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ + _serialize_tuple, RatingDeserializer +from pyspark.rdd import RDD class MatrixFactorizationModel(object): """A matrix factorisation model trained by regularized alternating @@ -33,6 +35,9 @@ class MatrixFactorizationModel(object): >>> model = ALS.trainImplicit(sc, ratings, 1) >>> model.predict(2,2) is not None True + >>> testset = sc.parallelize([(1, 2), (1, 1)]) + >>> model.predictAll(testset).count == 2 + True """ def __init__(self, sc, java_model): @@ -45,6 +50,11 @@ class MatrixFactorizationModel(object): def predict(self, user, product): return self._java_model.predict(user, product) + def predictAll(self, usersProducts): + usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) + return RDD(self._java_model.predict(usersProductsJRDD._jrdd), + self._context, RatingDeserializer()) + class ALS(object): @classmethod def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f87923e6fa..6fb4a7b3be 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -23,6 +23,7 @@ import operator import os import sys import shlex +import traceback from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -39,6 +40,46 @@ from py4j.java_collections import ListConverter, MapConverter __all__ = ["RDD"] +def _extract_concise_traceback(): + tb = traceback.extract_stack() + if len(tb) == 0: + return "I'm lost!" + # HACK: This function is in a file called 'rdd.py' in the top level of + # everything PySpark. Just trim off the directory name and assume + # everything in that tree is PySpark guts. + file, line, module, what = tb[len(tb) - 1] + sparkpath = os.path.dirname(file) + first_spark_frame = len(tb) - 1 + for i in range(0, len(tb)): + file, line, fun, what = tb[i] + if file.startswith(sparkpath): + first_spark_frame = i + break + if first_spark_frame == 0: + file, line, fun, what = tb[0] + return "%s at %s:%d" % (fun, file, line) + sfile, sline, sfun, swhat = tb[first_spark_frame] + ufile, uline, ufun, uwhat = tb[first_spark_frame-1] + return "%s at %s:%d" % (sfun, ufile, uline) + +_spark_stack_depth = 0 + +class _JavaStackTrace(object): + def __init__(self, sc): + self._traceback = _extract_concise_traceback() + self._context = sc + + def __enter__(self): + global _spark_stack_depth + if _spark_stack_depth == 0: + self._context._jsc.setCallSite(self._traceback) + _spark_stack_depth += 1 + + def __exit__(self, type, value, tb): + global _spark_stack_depth + _spark_stack_depth -= 1 + if _spark_stack_depth == 0: + self._context._jsc.setCallSite(None) class RDD(object): """ @@ -401,7 +442,8 @@ class RDD(object): """ Return a list that contains all of the elements in this RDD. """ - bytesInJava = self._jrdd.collect().iterator() + with _JavaStackTrace(self.context) as st: + bytesInJava = self._jrdd.collect().iterator() return list(self._collect_iterator_through_file(bytesInJava)) def _collect_iterator_through_file(self, iterator): @@ -582,13 +624,14 @@ class RDD(object): # TODO(shivaram): Similar to the scala implementation, update the take # method to scan multiple splits based on an estimate of how many elements # we have per-split. - for partition in range(mapped._jrdd.splits().size()): - partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) - partitionsToTake[0] = partition - iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() - items.extend(mapped._collect_iterator_through_file(iterator)) - if len(items) >= num: - break + with _JavaStackTrace(self.context) as st: + for partition in range(mapped._jrdd.splits().size()): + partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) + partitionsToTake[0] = partition + iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() + items.extend(mapped._collect_iterator_through_file(iterator)) + if len(items) >= num: + break return items[:num] def first(self): @@ -765,9 +808,10 @@ class RDD(object): yield outputSerializer.dumps(items) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True - pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() - partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, - id(partitionFunc)) + with _JavaStackTrace(self.context) as st: + pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() + partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, + id(partitionFunc)) jrdd = pairRDD.partitionBy(partitioner).values() rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) # This is required so that id(partitionFunc) remains unique, even if diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index ef07eb437b..1602227a27 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -47,7 +47,7 @@ print "Spark context available as sc." if add_files != None: print "Adding files: [%s]" % ", ".join(add_files) -# The ./pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP, +# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP, # which allows us to execute the user's PYTHONSTARTUP file: _pythonstartup = os.environ.get('OLD_PYTHONSTARTUP') if _pythonstartup and os.path.isfile(_pythonstartup): |