diff options
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 108f36576a..f955aad7a4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer @@ -49,14 +50,15 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH - def __init__(self, master, jobName, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer()): + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None): """ - Create a new SparkContext. + Create a new SparkContext. At least the master and app name should be set, + either through the named parameters here or through C{conf}. @param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - @param jobName: A name for your job, to display on the cluster web UI + @param appName: A name for your job, to display on the cluster web UI. @param sparkHome: Location where Spark is installed on cluster nodes. @param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file @@ -67,6 +69,7 @@ class SparkContext(object): Java object. Set 1 to disable batching or -1 to use an unlimited batch size. @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. >>> from pyspark.context import SparkContext @@ -79,10 +82,8 @@ class SparkContext(object): """ SparkContext._ensure_initialized(self) - self.master = master - self.jobName = jobName - self.sparkHome = sparkHome or None # None becomes null in Py4J self.environment = environment or {} + self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer if batchSize == 1: @@ -91,10 +92,35 @@ class SparkContext(object): self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) + # Set any parameters passed directly to us on the conf + if master: + self._conf.setMaster(master) + if appName: + self._conf.setAppName(appName) + if sparkHome: + self._conf.setSparkHome(sparkHome) + if environment: + for key, value in environment.iteritems(): + self._conf.setExecutorEnv(key, value) + + # Check that we have at least the required parameters + if not self._conf.contains("spark.master"): + raise Exception("A master URL must be set in your configuration") + if not self._conf.contains("spark.app.name"): + raise Exception("An application name must be set in your configuration") + + # Read back our properties from the conf in case we loaded some of them from + # the classpath or an external config file + self.master = self._conf.get("spark.master") + self.appName = self._conf.get("spark.app.name") + self.sparkHome = self._conf.get("spark.home", None) + for (k, v) in self._conf.getAll(): + if k.startswith("spark.executorEnv."): + varName = k[len("spark.executorEnv."):] + self.environment[varName] = v + # Create the Java SparkContext through Py4J - empty_string_array = self._gateway.new_array(self._jvm.String, 0) - self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome, - empty_string_array) + self._jsc = self._jvm.JavaSparkContext(self._conf._jconf) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server @@ -105,6 +131,7 @@ class SparkContext(object): self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have # been pickled, so it can determine which Java broadcast objects to @@ -121,7 +148,7 @@ class SparkContext(object): self.addPyFile(path) # Create a temporary directory inside spark.local.dir: - local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir() + local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() @@ -131,8 +158,7 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeToFile = \ - SparkContext._jvm.PythonRDD.writeToFile + SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: @@ -143,8 +169,8 @@ class SparkContext(object): @classmethod def setSystemProperty(cls, key, value): """ - Set a system property, such as spark.executor.memory. This must be - invoked before instantiating SparkContext. + Set a Java system property, such as spark.executor.memory. This must + must be invoked before instantiating SparkContext. """ SparkContext._ensure_initialized() SparkContext._jvm.java.lang.System.setProperty(key, value) @@ -243,7 +269,8 @@ class SparkContext(object): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a C{Broadcast} + Broadcast a read-only variable to the cluster, returning a + L{Broadcast<pyspark.broadcast.Broadcast>} object for reading it in distributed functions. The variable will be sent to each cluster only once. """ |