aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-29 14:03:39 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-29 14:03:39 -0500
commitcd00225db9b90fc845fd1458831bdd9d014d1bb6 (patch)
tree6afd53d8482dda409f302c1f2c20d87987c72dba /python/pyspark/context.py
parent1c11f54a9b7340ccfa7bf7236fbcd210b77ae0a8 (diff)
downloadspark-cd00225db9b90fc845fd1458831bdd9d014d1bb6.tar.gz
spark-cd00225db9b90fc845fd1458831bdd9d014d1bb6.tar.bz2
spark-cd00225db9b90fc845fd1458831bdd9d014d1bb6.zip
Add SparkConf support in Python
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py40
1 files changed, 28 insertions, 12 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c0645b2847..97c1526afd 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
+from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer
@@ -49,14 +50,14 @@ class SparkContext(object):
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
- environment=None, batchSize=1024, serializer=PickleSerializer()):
+ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
+ environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None):
"""
Create a new SparkContext.
@param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
- @param jobName: A name for your job, to display on the cluster web UI
+ @param appName: A name for your job, to display on the cluster web UI.
@param sparkHome: Location where Spark is installed on cluster nodes.
@param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
@@ -67,6 +68,7 @@ class SparkContext(object):
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
@param serializer: The serializer for RDDs.
+ @param conf: A L{SparkConf} object setting Spark properties.
>>> from pyspark.context import SparkContext
@@ -79,10 +81,7 @@ class SparkContext(object):
"""
SparkContext._ensure_initialized(self)
- self.master = master
- self.jobName = jobName
- self.sparkHome = sparkHome or None # None becomes null in Py4J
- self.environment = environment or {}
+ self.conf = conf or SparkConf()
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
if batchSize == 1:
@@ -91,10 +90,26 @@ class SparkContext(object):
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
+ # Set parameters passed directly on our conf; these operations will be no-ops
+ # if the parameters were None
+ self.conf.setMaster(master)
+ self.conf.setAppName(appName)
+ self.conf.setSparkHome(sparkHome)
+ environment = environment or {}
+ for key, value in environment.iteritems():
+ self.conf.setExecutorEnv(key, value)
+
+ if not self.conf.contains("spark.master"):
+ raise Exception("A master URL must be set in your configuration")
+ if not self.conf.contains("spark.appName"):
+ raise Exception("An application name must be set in your configuration")
+
+ self.master = self.conf.get("spark.master")
+ self.appName = self.conf.get("spark.appName")
+ self.sparkHome = self.conf.getOrElse("spark.home", None)
+
# Create the Java SparkContext through Py4J
- empty_string_array = self._gateway.new_array(self._jvm.String, 0)
- self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome,
- empty_string_array)
+ self._jsc = self._jvm.JavaSparkContext(self.conf._jconf)
# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
@@ -105,6 +120,7 @@ class SparkContext(object):
self._jvm.PythonAccumulatorParam(host, port))
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
+
# Broadcast's __reduce__ method stores Broadcast instances here.
# This allows other code to determine which Broadcast instances have
# been pickled, so it can determine which Java broadcast objects to
@@ -143,8 +159,8 @@ class SparkContext(object):
@classmethod
def setSystemProperty(cls, key, value):
"""
- Set a system property, such as spark.executor.memory. This must be
- invoked before instantiating SparkContext.
+ Set a Java system property, such as spark.executor.memory. This must
+ must be invoked before instantiating SparkContext.
"""
SparkContext._ensure_initialized()
SparkContext._jvm.java.lang.System.setProperty(key, value)