diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/epydoc.conf | 2 | ||||
-rw-r--r-- | python/pyspark/__init__.py | 32 | ||||
-rw-r--r-- | python/pyspark/broadcast.py | 11 | ||||
-rw-r--r-- | python/pyspark/conf.py | 170 | ||||
-rw-r--r-- | python/pyspark/context.py | 57 | ||||
-rw-r--r-- | python/pyspark/java_gateway.py | 1 | ||||
-rwxr-xr-x | python/run-tests | 3 |
7 files changed, 245 insertions, 31 deletions
diff --git a/python/epydoc.conf b/python/epydoc.conf index 0b42e729f8..95a6af0974 100644 --- a/python/epydoc.conf +++ b/python/epydoc.conf @@ -34,4 +34,4 @@ private: no exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test - pyspark.rddsampler pyspark.daemon + pyspark.rddsampler pyspark.daemon pyspark.mllib._common diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 1f35f6f939..2b2c3a061a 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -20,28 +20,34 @@ PySpark is the Python API for Spark. Public classes: - - L{SparkContext<pyspark.context.SparkContext>} - Main entry point for Spark functionality. - - L{RDD<pyspark.rdd.RDD>} - A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. - - L{Broadcast<pyspark.broadcast.Broadcast>} - A broadcast variable that gets reused across tasks. - - L{Accumulator<pyspark.accumulators.Accumulator>} - An "add-only" shared variable that tasks can only add values to. - - L{SparkFiles<pyspark.files.SparkFiles>} - Access files shipped with jobs. - - L{StorageLevel<pyspark.storagelevel.StorageLevel>} - Finer-grained cache persistence levels. + - L{SparkContext<pyspark.context.SparkContext>} + Main entry point for Spark functionality. + - L{RDD<pyspark.rdd.RDD>} + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. + - L{Broadcast<pyspark.broadcast.Broadcast>} + A broadcast variable that gets reused across tasks. + - L{Accumulator<pyspark.accumulators.Accumulator>} + An "add-only" shared variable that tasks can only add values to. + - L{SparkConf<pyspark.conf.SparkConf>} + For configuring Spark. + - L{SparkFiles<pyspark.files.SparkFiles>} + Access files shipped with jobs. + - L{StorageLevel<pyspark.storagelevel.StorageLevel>} + Finer-grained cache persistence levels. """ + + + import sys import os sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg")) +from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel -__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"] +__all__ = ["SparkConf", "SparkContext", "RDD", "SparkFiles", "StorageLevel"] diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index dfdaba274f..43f40f8783 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -45,7 +45,18 @@ def _from_id(bid): class Broadcast(object): + """ + A broadcast variable created with + L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}. + Access its value through C{.value}. + """ + def __init__(self, bid, value, java_broadcast=None, pickle_registry=None): + """ + Should not be called directly by users -- use + L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>} + instead. + """ self.value = value self.bid = bid self._jbroadcast = java_broadcast diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py new file mode 100644 index 0000000000..9dcdcfaa67 --- /dev/null +++ b/python/pyspark/conf.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +>>> from pyspark.conf import SparkConf +>>> from pyspark.context import SparkContext +>>> conf = SparkConf() +>>> conf.setMaster("local").setAppName("My app") +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.master") +u'local' +>>> conf.get("spark.app.name") +u'My app' +>>> sc = SparkContext(conf=conf) +>>> sc.master +u'local' +>>> sc.appName +u'My app' +>>> sc.sparkHome == None +True + +>>> conf = SparkConf() +>>> conf.setSparkHome("/path") +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.home") +u'/path' +>>> conf.setExecutorEnv("VAR1", "value1") +<pyspark.conf.SparkConf object at ...> +>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) +<pyspark.conf.SparkConf object at ...> +>>> conf.get("spark.executorEnv.VAR1") +u'value1' +>>> print conf.toDebugString() +spark.executorEnv.VAR1=value1 +spark.executorEnv.VAR3=value3 +spark.executorEnv.VAR4=value4 +spark.home=/path +>>> sorted(conf.getAll(), key=lambda p: p[0]) +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +""" + + +class SparkConf(object): + """ + Configuration for a Spark application. Used to set various Spark + parameters as key-value pairs. + + Most of the time, you would create a SparkConf object with + C{SparkConf()}, which will load values from C{spark.*} Java system + properties and any C{spark.conf} on your Spark classpath. In this + case, system properties take priority over C{spark.conf}, and any + parameters you set directly on the C{SparkConf} object take priority + over both of those. + + For unit tests, you can also call C{SparkConf(false)} to skip + loading external settings and get the same configuration no matter + what is on the classpath. + + All setter methods in this class support chaining. For example, + you can write C{conf.setMaster("local").setAppName("My app")}. + + Note that once a SparkConf object is passed to Spark, it is cloned + and can no longer be modified by the user. + """ + + def __init__(self, loadDefaults=True, _jvm=None): + """ + Create a new Spark configuration. + + @param loadDefaults: whether to load values from Java system + properties and classpath (True by default) + @param _jvm: internal parameter used to pass a handle to the + Java VM; does not need to be set by users + """ + from pyspark.context import SparkContext + SparkContext._ensure_initialized() + _jvm = _jvm or SparkContext._jvm + self._jconf = _jvm.SparkConf(loadDefaults) + + def set(self, key, value): + """Set a configuration property.""" + self._jconf.set(key, value) + return self + + def setMaster(self, value): + """Set master URL to connect to.""" + self._jconf.setMaster(value) + return self + + def setAppName(self, value): + """Set application name.""" + self._jconf.setAppName(value) + return self + + def setSparkHome(self, value): + """Set path where Spark is installed on worker nodes.""" + self._jconf.setSparkHome(value) + return self + + def setExecutorEnv(self, key=None, value=None, pairs=None): + """Set an environment variable to be passed to executors.""" + if (key != None and pairs != None) or (key == None and pairs == None): + raise Exception("Either pass one key-value pair or a list of pairs") + elif key != None: + self._jconf.setExecutorEnv(key, value) + elif pairs != None: + for (k, v) in pairs: + self._jconf.setExecutorEnv(k, v) + return self + + def setAll(self, pairs): + """ + Set multiple parameters, passed as a list of key-value pairs. + + @param pairs: list of key-value pairs to set + """ + for (k, v) in pairs: + self._jconf.set(k, v) + return self + + def get(self, key): + """Get the configured value for some key, if set.""" + return self._jconf.get(key) + + def getOrElse(self, key, defaultValue): + """Get the value for some key, or return a default otherwise.""" + return self._jconf.getOrElse(key, defaultValue) + + def getAll(self): + """Get all values as a list of key-value pairs.""" + pairs = [] + for elem in self._jconf.getAll(): + pairs.append((elem._1(), elem._2())) + return pairs + + def contains(self, key): + """Does this configuration contain a given key?""" + return self._jconf.contains(key) + + def toDebugString(self): + """ + Returns a printable version of the configuration, as a list of + key=value pairs, one per line. + """ + return self._jconf.toDebugString() + + +def _test(): + import doctest + (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 108f36576a..d77dd76765 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer @@ -49,14 +50,15 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH - def __init__(self, master, jobName, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer()): + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None): """ - Create a new SparkContext. + Create a new SparkContext. At least the master and app name should be set, + either through the named parameters here or through C{conf}. @param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - @param jobName: A name for your job, to display on the cluster web UI + @param appName: A name for your job, to display on the cluster web UI. @param sparkHome: Location where Spark is installed on cluster nodes. @param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file @@ -67,6 +69,7 @@ class SparkContext(object): Java object. Set 1 to disable batching or -1 to use an unlimited batch size. @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. >>> from pyspark.context import SparkContext @@ -79,10 +82,8 @@ class SparkContext(object): """ SparkContext._ensure_initialized(self) - self.master = master - self.jobName = jobName - self.sparkHome = sparkHome or None # None becomes null in Py4J self.environment = environment or {} + self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer if batchSize == 1: @@ -91,10 +92,33 @@ class SparkContext(object): self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) + # Set parameters passed directly to us on the conf; these operations will be + # no-ops if the parameters were None + self._conf.setMaster(master) + self._conf.setAppName(appName) + self._conf.setSparkHome(sparkHome) + if environment: + for key, value in environment.iteritems(): + self._conf.setExecutorEnv(key, value) + + # Check that we have at least the required parameters + if not self._conf.contains("spark.master"): + raise Exception("A master URL must be set in your configuration") + if not self._conf.contains("spark.app.name"): + raise Exception("An application name must be set in your configuration") + + # Read back our properties from the conf in case we loaded some of them from + # the classpath or an external config file + self.master = self._conf.get("spark.master") + self.appName = self._conf.get("spark.app.name") + self.sparkHome = self._conf.getOrElse("spark.home", None) + for (k, v) in self._conf.getAll(): + if k.startswith("spark.executorEnv."): + varName = k[len("spark.executorEnv."):] + self.environment[varName] = v + # Create the Java SparkContext through Py4J - empty_string_array = self._gateway.new_array(self._jvm.String, 0) - self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome, - empty_string_array) + self._jsc = self._jvm.JavaSparkContext(self._conf._jconf) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server @@ -105,6 +129,7 @@ class SparkContext(object): self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have # been pickled, so it can determine which Java broadcast objects to @@ -121,7 +146,7 @@ class SparkContext(object): self.addPyFile(path) # Create a temporary directory inside spark.local.dir: - local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir() + local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() @@ -131,8 +156,7 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeToFile = \ - SparkContext._jvm.PythonRDD.writeToFile + SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: @@ -143,8 +167,8 @@ class SparkContext(object): @classmethod def setSystemProperty(cls, key, value): """ - Set a system property, such as spark.executor.memory. This must be - invoked before instantiating SparkContext. + Set a Java system property, such as spark.executor.memory. This must + must be invoked before instantiating SparkContext. """ SparkContext._ensure_initialized() SparkContext._jvm.java.lang.System.setProperty(key, value) @@ -243,7 +267,8 @@ class SparkContext(object): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a C{Broadcast} + Broadcast a read-only variable to the cluster, returning a + L{Broadcast<pyspark.broadcast.Broadcast>} object for reading it in distributed functions. The variable will be sent to each cluster only once. """ diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index eb79135b9d..d8ca9fce00 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -60,6 +60,7 @@ def launch_gateway(): # Connect to the gateway gateway = JavaGateway(GatewayClient(port=port), auto_convert=False) # Import the classes used by PySpark + java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") diff --git a/python/run-tests b/python/run-tests index d4dad672d2..4b71fff7c1 100755 --- a/python/run-tests +++ b/python/run-tests @@ -29,12 +29,13 @@ FAILED=0 rm -f unit-tests.log function run_test() { - $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log + SPARK_TESTING=0 $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) } run_test "pyspark/rdd.py" run_test "pyspark/context.py" +run_test "pyspark/conf.py" run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/serializers.py" |