aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-01 21:29:12 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-01 21:29:12 -0800
commit3713f8129a618a633a7aca8c944960c3e7ac9d3b (patch)
treeff3aa8fa3460078007259a6a6479dc4aec27b50a /python
parentc1d928a897f8daed5d7e74f4af476b67046f348d (diff)
parent7e8d2e8a5c88d16c771923504c433491b109ab2a (diff)
downloadspark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.tar.gz
spark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.tar.bz2
spark-3713f8129a618a633a7aca8c944960c3e7ac9d3b.zip
Merge pull request #309 from mateiz/conf2
SPARK-544. Migrate configuration to a SparkConf class This is still a work in progress based on Prashant and Evan's code. So far I've done the following: - Got rid of global SparkContext.globalConf - Passed SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation). But the API at least is hopefully ready for review. Unfortunately there was a lot of global stuff before due to a "SparkContext.globalConf" method that let you set a "default" configuration of sorts, which meant I had to make some pretty big changes.
Diffstat (limited to 'python')
-rw-r--r--python/epydoc.conf2
-rw-r--r--python/pyspark/__init__.py32
-rw-r--r--python/pyspark/broadcast.py11
-rw-r--r--python/pyspark/conf.py171
-rw-r--r--python/pyspark/context.py59
-rw-r--r--python/pyspark/java_gateway.py1
-rwxr-xr-xpython/run-tests3
7 files changed, 248 insertions, 31 deletions
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 0b42e729f8..95a6af0974 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -34,4 +34,4 @@ private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test
- pyspark.rddsampler pyspark.daemon
+ pyspark.rddsampler pyspark.daemon pyspark.mllib._common
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 1f35f6f939..2b2c3a061a 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -20,28 +20,34 @@ PySpark is the Python API for Spark.
Public classes:
- - L{SparkContext<pyspark.context.SparkContext>}
- Main entry point for Spark functionality.
- - L{RDD<pyspark.rdd.RDD>}
- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- - L{Broadcast<pyspark.broadcast.Broadcast>}
- A broadcast variable that gets reused across tasks.
- - L{Accumulator<pyspark.accumulators.Accumulator>}
- An "add-only" shared variable that tasks can only add values to.
- - L{SparkFiles<pyspark.files.SparkFiles>}
- Access files shipped with jobs.
- - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
- Finer-grained cache persistence levels.
+ - L{SparkContext<pyspark.context.SparkContext>}
+ Main entry point for Spark functionality.
+ - L{RDD<pyspark.rdd.RDD>}
+ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+ - L{Broadcast<pyspark.broadcast.Broadcast>}
+ A broadcast variable that gets reused across tasks.
+ - L{Accumulator<pyspark.accumulators.Accumulator>}
+ An "add-only" shared variable that tasks can only add values to.
+ - L{SparkConf<pyspark.conf.SparkConf>}
+ For configuring Spark.
+ - L{SparkFiles<pyspark.files.SparkFiles>}
+ Access files shipped with jobs.
+ - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
+ Finer-grained cache persistence levels.
"""
+
+
+
import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
+from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
+__all__ = ["SparkConf", "SparkContext", "RDD", "SparkFiles", "StorageLevel"]
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index dfdaba274f..43f40f8783 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -45,7 +45,18 @@ def _from_id(bid):
class Broadcast(object):
+ """
+ A broadcast variable created with
+ L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.
+ Access its value through C{.value}.
+ """
+
def __init__(self, bid, value, java_broadcast=None, pickle_registry=None):
+ """
+ Should not be called directly by users -- use
+ L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}
+ instead.
+ """
self.value = value
self.bid = bid
self._jbroadcast = java_broadcast
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
new file mode 100644
index 0000000000..d72aed6a30
--- /dev/null
+++ b/python/pyspark/conf.py
@@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+>>> from pyspark.conf import SparkConf
+>>> from pyspark.context import SparkContext
+>>> conf = SparkConf()
+>>> conf.setMaster("local").setAppName("My app")
+<pyspark.conf.SparkConf object at ...>
+>>> conf.get("spark.master")
+u'local'
+>>> conf.get("spark.app.name")
+u'My app'
+>>> sc = SparkContext(conf=conf)
+>>> sc.master
+u'local'
+>>> sc.appName
+u'My app'
+>>> sc.sparkHome == None
+True
+
+>>> conf = SparkConf()
+>>> conf.setSparkHome("/path")
+<pyspark.conf.SparkConf object at ...>
+>>> conf.get("spark.home")
+u'/path'
+>>> conf.setExecutorEnv("VAR1", "value1")
+<pyspark.conf.SparkConf object at ...>
+>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")])
+<pyspark.conf.SparkConf object at ...>
+>>> conf.get("spark.executorEnv.VAR1")
+u'value1'
+>>> print conf.toDebugString()
+spark.executorEnv.VAR1=value1
+spark.executorEnv.VAR3=value3
+spark.executorEnv.VAR4=value4
+spark.home=/path
+>>> sorted(conf.getAll(), key=lambda p: p[0])
+[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
+"""
+
+
+class SparkConf(object):
+ """
+ Configuration for a Spark application. Used to set various Spark
+ parameters as key-value pairs.
+
+ Most of the time, you would create a SparkConf object with
+ C{SparkConf()}, which will load values from C{spark.*} Java system
+ properties and any C{spark.conf} on your Spark classpath. In this
+ case, system properties take priority over C{spark.conf}, and any
+ parameters you set directly on the C{SparkConf} object take priority
+ over both of those.
+
+ For unit tests, you can also call C{SparkConf(false)} to skip
+ loading external settings and get the same configuration no matter
+ what is on the classpath.
+
+ All setter methods in this class support chaining. For example,
+ you can write C{conf.setMaster("local").setAppName("My app")}.
+
+ Note that once a SparkConf object is passed to Spark, it is cloned
+ and can no longer be modified by the user.
+ """
+
+ def __init__(self, loadDefaults=True, _jvm=None):
+ """
+ Create a new Spark configuration.
+
+ @param loadDefaults: whether to load values from Java system
+ properties and classpath (True by default)
+ @param _jvm: internal parameter used to pass a handle to the
+ Java VM; does not need to be set by users
+ """
+ from pyspark.context import SparkContext
+ SparkContext._ensure_initialized()
+ _jvm = _jvm or SparkContext._jvm
+ self._jconf = _jvm.SparkConf(loadDefaults)
+
+ def set(self, key, value):
+ """Set a configuration property."""
+ self._jconf.set(key, unicode(value))
+ return self
+
+ def setMaster(self, value):
+ """Set master URL to connect to."""
+ self._jconf.setMaster(value)
+ return self
+
+ def setAppName(self, value):
+ """Set application name."""
+ self._jconf.setAppName(value)
+ return self
+
+ def setSparkHome(self, value):
+ """Set path where Spark is installed on worker nodes."""
+ self._jconf.setSparkHome(value)
+ return self
+
+ def setExecutorEnv(self, key=None, value=None, pairs=None):
+ """Set an environment variable to be passed to executors."""
+ if (key != None and pairs != None) or (key == None and pairs == None):
+ raise Exception("Either pass one key-value pair or a list of pairs")
+ elif key != None:
+ self._jconf.setExecutorEnv(key, value)
+ elif pairs != None:
+ for (k, v) in pairs:
+ self._jconf.setExecutorEnv(k, v)
+ return self
+
+ def setAll(self, pairs):
+ """
+ Set multiple parameters, passed as a list of key-value pairs.
+
+ @param pairs: list of key-value pairs to set
+ """
+ for (k, v) in pairs:
+ self._jconf.set(k, v)
+ return self
+
+ def get(self, key, defaultValue=None):
+ """Get the configured value for some key, or return a default otherwise."""
+ if defaultValue == None: # Py4J doesn't call the right get() if we pass None
+ if not self._jconf.contains(key):
+ return None
+ return self._jconf.get(key)
+ else:
+ return self._jconf.get(key, defaultValue)
+
+ def getAll(self):
+ """Get all values as a list of key-value pairs."""
+ pairs = []
+ for elem in self._jconf.getAll():
+ pairs.append((elem._1(), elem._2()))
+ return pairs
+
+ def contains(self, key):
+ """Does this configuration contain a given key?"""
+ return self._jconf.contains(key)
+
+ def toDebugString(self):
+ """
+ Returns a printable version of the configuration, as a list of
+ key=value pairs, one per line.
+ """
+ return self._jconf.toDebugString()
+
+
+def _test():
+ import doctest
+ (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 108f36576a..f955aad7a4 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
+from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer
@@ -49,14 +50,15 @@ class SparkContext(object):
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
- environment=None, batchSize=1024, serializer=PickleSerializer()):
+ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
+ environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None):
"""
- Create a new SparkContext.
+ Create a new SparkContext. At least the master and app name should be set,
+ either through the named parameters here or through C{conf}.
@param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
- @param jobName: A name for your job, to display on the cluster web UI
+ @param appName: A name for your job, to display on the cluster web UI.
@param sparkHome: Location where Spark is installed on cluster nodes.
@param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
@@ -67,6 +69,7 @@ class SparkContext(object):
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
@param serializer: The serializer for RDDs.
+ @param conf: A L{SparkConf} object setting Spark properties.
>>> from pyspark.context import SparkContext
@@ -79,10 +82,8 @@ class SparkContext(object):
"""
SparkContext._ensure_initialized(self)
- self.master = master
- self.jobName = jobName
- self.sparkHome = sparkHome or None # None becomes null in Py4J
self.environment = environment or {}
+ self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
if batchSize == 1:
@@ -91,10 +92,35 @@ class SparkContext(object):
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
+ # Set any parameters passed directly to us on the conf
+ if master:
+ self._conf.setMaster(master)
+ if appName:
+ self._conf.setAppName(appName)
+ if sparkHome:
+ self._conf.setSparkHome(sparkHome)
+ if environment:
+ for key, value in environment.iteritems():
+ self._conf.setExecutorEnv(key, value)
+
+ # Check that we have at least the required parameters
+ if not self._conf.contains("spark.master"):
+ raise Exception("A master URL must be set in your configuration")
+ if not self._conf.contains("spark.app.name"):
+ raise Exception("An application name must be set in your configuration")
+
+ # Read back our properties from the conf in case we loaded some of them from
+ # the classpath or an external config file
+ self.master = self._conf.get("spark.master")
+ self.appName = self._conf.get("spark.app.name")
+ self.sparkHome = self._conf.get("spark.home", None)
+ for (k, v) in self._conf.getAll():
+ if k.startswith("spark.executorEnv."):
+ varName = k[len("spark.executorEnv."):]
+ self.environment[varName] = v
+
# Create the Java SparkContext through Py4J
- empty_string_array = self._gateway.new_array(self._jvm.String, 0)
- self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome,
- empty_string_array)
+ self._jsc = self._jvm.JavaSparkContext(self._conf._jconf)
# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
@@ -105,6 +131,7 @@ class SparkContext(object):
self._jvm.PythonAccumulatorParam(host, port))
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
+
# Broadcast's __reduce__ method stores Broadcast instances here.
# This allows other code to determine which Broadcast instances have
# been pickled, so it can determine which Java broadcast objects to
@@ -121,7 +148,7 @@ class SparkContext(object):
self.addPyFile(path)
# Create a temporary directory inside spark.local.dir:
- local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir()
+ local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
@@ -131,8 +158,7 @@ class SparkContext(object):
if not SparkContext._gateway:
SparkContext._gateway = launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
- SparkContext._writeToFile = \
- SparkContext._jvm.PythonRDD.writeToFile
+ SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
@@ -143,8 +169,8 @@ class SparkContext(object):
@classmethod
def setSystemProperty(cls, key, value):
"""
- Set a system property, such as spark.executor.memory. This must be
- invoked before instantiating SparkContext.
+ Set a Java system property, such as spark.executor.memory. This must
+ must be invoked before instantiating SparkContext.
"""
SparkContext._ensure_initialized()
SparkContext._jvm.java.lang.System.setProperty(key, value)
@@ -243,7 +269,8 @@ class SparkContext(object):
def broadcast(self, value):
"""
- Broadcast a read-only variable to the cluster, returning a C{Broadcast}
+ Broadcast a read-only variable to the cluster, returning a
+ L{Broadcast<pyspark.broadcast.Broadcast>}
object for reading it in distributed functions. The variable will be
sent to each cluster only once.
"""
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index eb79135b9d..d8ca9fce00 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -60,6 +60,7 @@ def launch_gateway():
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
# Import the classes used by PySpark
+ java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
diff --git a/python/run-tests b/python/run-tests
index d4dad672d2..4b71fff7c1 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -29,12 +29,13 @@ FAILED=0
rm -f unit-tests.log
function run_test() {
- $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log
+ SPARK_TESTING=0 $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log
FAILED=$((PIPESTATUS[0]||$FAILED))
}
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
+run_test "pyspark/conf.py"
run_test "-m doctest pyspark/broadcast.py"
run_test "-m doctest pyspark/accumulators.py"
run_test "-m doctest pyspark/serializers.py"