aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py59
1 files changed, 43 insertions, 16 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 108f36576a..f955aad7a4 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -24,6 +24,7 @@ from tempfile import NamedTemporaryFile
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
+from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer
@@ -49,14 +50,15 @@ class SparkContext(object):
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
- environment=None, batchSize=1024, serializer=PickleSerializer()):
+ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
+ environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None):
"""
- Create a new SparkContext.
+ Create a new SparkContext. At least the master and app name should be set,
+ either through the named parameters here or through C{conf}.
@param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
- @param jobName: A name for your job, to display on the cluster web UI
+ @param appName: A name for your job, to display on the cluster web UI.
@param sparkHome: Location where Spark is installed on cluster nodes.
@param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
@@ -67,6 +69,7 @@ class SparkContext(object):
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
@param serializer: The serializer for RDDs.
+ @param conf: A L{SparkConf} object setting Spark properties.
>>> from pyspark.context import SparkContext
@@ -79,10 +82,8 @@ class SparkContext(object):
"""
SparkContext._ensure_initialized(self)
- self.master = master
- self.jobName = jobName
- self.sparkHome = sparkHome or None # None becomes null in Py4J
self.environment = environment or {}
+ self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
if batchSize == 1:
@@ -91,10 +92,35 @@ class SparkContext(object):
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
+ # Set any parameters passed directly to us on the conf
+ if master:
+ self._conf.setMaster(master)
+ if appName:
+ self._conf.setAppName(appName)
+ if sparkHome:
+ self._conf.setSparkHome(sparkHome)
+ if environment:
+ for key, value in environment.iteritems():
+ self._conf.setExecutorEnv(key, value)
+
+ # Check that we have at least the required parameters
+ if not self._conf.contains("spark.master"):
+ raise Exception("A master URL must be set in your configuration")
+ if not self._conf.contains("spark.app.name"):
+ raise Exception("An application name must be set in your configuration")
+
+ # Read back our properties from the conf in case we loaded some of them from
+ # the classpath or an external config file
+ self.master = self._conf.get("spark.master")
+ self.appName = self._conf.get("spark.app.name")
+ self.sparkHome = self._conf.get("spark.home", None)
+ for (k, v) in self._conf.getAll():
+ if k.startswith("spark.executorEnv."):
+ varName = k[len("spark.executorEnv."):]
+ self.environment[varName] = v
+
# Create the Java SparkContext through Py4J
- empty_string_array = self._gateway.new_array(self._jvm.String, 0)
- self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome,
- empty_string_array)
+ self._jsc = self._jvm.JavaSparkContext(self._conf._jconf)
# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
@@ -105,6 +131,7 @@ class SparkContext(object):
self._jvm.PythonAccumulatorParam(host, port))
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
+
# Broadcast's __reduce__ method stores Broadcast instances here.
# This allows other code to determine which Broadcast instances have
# been pickled, so it can determine which Java broadcast objects to
@@ -121,7 +148,7 @@ class SparkContext(object):
self.addPyFile(path)
# Create a temporary directory inside spark.local.dir:
- local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir()
+ local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
@@ -131,8 +158,7 @@ class SparkContext(object):
if not SparkContext._gateway:
SparkContext._gateway = launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
- SparkContext._writeToFile = \
- SparkContext._jvm.PythonRDD.writeToFile
+ SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
@@ -143,8 +169,8 @@ class SparkContext(object):
@classmethod
def setSystemProperty(cls, key, value):
"""
- Set a system property, such as spark.executor.memory. This must be
- invoked before instantiating SparkContext.
+ Set a Java system property, such as spark.executor.memory. This must
+ must be invoked before instantiating SparkContext.
"""
SparkContext._ensure_initialized()
SparkContext._jvm.java.lang.System.setProperty(key, value)
@@ -243,7 +269,8 @@ class SparkContext(object):
def broadcast(self, value):
"""
- Broadcast a read-only variable to the cluster, returning a C{Broadcast}
+ Broadcast a read-only variable to the cluster, returning a
+ L{Broadcast<pyspark.broadcast.Broadcast>}
object for reading it in distributed functions. The variable will be
sent to each cluster only once.
"""