From cd00225db9b90fc845fd1458831bdd9d014d1bb6 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 29 Dec 2013 14:03:39 -0500 Subject: Add SparkConf support in Python --- python/pyspark/context.py | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) (limited to 'python/pyspark/context.py') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c0645b2847..97c1526afd 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer @@ -49,14 +50,14 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH - def __init__(self, master, jobName, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer()): + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None): """ Create a new SparkContext. @param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - @param jobName: A name for your job, to display on the cluster web UI + @param appName: A name for your job, to display on the cluster web UI. @param sparkHome: Location where Spark is installed on cluster nodes. @param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file @@ -67,6 +68,7 @@ class SparkContext(object): Java object. Set 1 to disable batching or -1 to use an unlimited batch size. @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. >>> from pyspark.context import SparkContext @@ -79,10 +81,7 @@ class SparkContext(object): """ SparkContext._ensure_initialized(self) - self.master = master - self.jobName = jobName - self.sparkHome = sparkHome or None # None becomes null in Py4J - self.environment = environment or {} + self.conf = conf or SparkConf() self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer if batchSize == 1: @@ -91,10 +90,26 @@ class SparkContext(object): self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) + # Set parameters passed directly on our conf; these operations will be no-ops + # if the parameters were None + self.conf.setMaster(master) + self.conf.setAppName(appName) + self.conf.setSparkHome(sparkHome) + environment = environment or {} + for key, value in environment.iteritems(): + self.conf.setExecutorEnv(key, value) + + if not self.conf.contains("spark.master"): + raise Exception("A master URL must be set in your configuration") + if not self.conf.contains("spark.appName"): + raise Exception("An application name must be set in your configuration") + + self.master = self.conf.get("spark.master") + self.appName = self.conf.get("spark.appName") + self.sparkHome = self.conf.getOrElse("spark.home", None) + # Create the Java SparkContext through Py4J - empty_string_array = self._gateway.new_array(self._jvm.String, 0) - self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome, - empty_string_array) + self._jsc = self._jvm.JavaSparkContext(self.conf._jconf) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server @@ -105,6 +120,7 @@ class SparkContext(object): self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have # been pickled, so it can determine which Java broadcast objects to @@ -143,8 +159,8 @@ class SparkContext(object): @classmethod def setSystemProperty(cls, key, value): """ - Set a system property, such as spark.executor.memory. This must be - invoked before instantiating SparkContext. + Set a Java system property, such as spark.executor.memory. This must + must be invoked before instantiating SparkContext. """ SparkContext._ensure_initialized() SparkContext._jvm.java.lang.System.setProperty(key, value) -- cgit v1.2.3