aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2013-11-12 15:14:21 +0800
committerRaymond Liu <raymond.liu@intel.com>2013-11-13 16:55:11 +0800
commit0f2e3c6e31d56c627ff81cdc93289a7c7cb2ec16 (patch)
tree60f01110b170ff72347e1ae6209f898712578ed3 /python
parent5429d62dfa16305eb23d67dfe38172803c80db65 (diff)
parent3d4ad84b63e440fd3f4b3edb1b120ff7c14a42d1 (diff)
downloadspark-0f2e3c6e31d56c627ff81cdc93289a7c7cb2ec16.tar.gz
spark-0f2e3c6e31d56c627ff81cdc93289a7c7cb2ec16.tar.bz2
spark-0f2e3c6e31d56c627ff81cdc93289a7c7cb2ec16.zip
Merge branch 'master' into scala-2.10
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/accumulators.py13
-rw-r--r--python/pyspark/context.py50
2 files changed, 50 insertions, 13 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index d367f91967..da3d96689a 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -42,6 +42,13 @@
>>> a.value
13
+>>> b = sc.accumulator(0)
+>>> def g(x):
+... b.add(x)
+>>> rdd.foreach(g)
+>>> b.value
+6
+
>>> from pyspark.accumulators import AccumulatorParam
>>> class VectorAccumulatorParam(AccumulatorParam):
... def zero(self, value):
@@ -139,9 +146,13 @@ class Accumulator(object):
raise Exception("Accumulator.value cannot be accessed inside tasks")
self._value = value
+ def add(self, term):
+ """Adds a term to this accumulator's value"""
+ self._value = self.accum_param.addInPlace(self._value, term)
+
def __iadd__(self, term):
"""The += operator; adds a term to this accumulator's value"""
- self._value = self.accum_param.addInPlace(self._value, term)
+ self.add(term)
return self
def __str__(self):
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 597110321a..a7ca8bc888 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -49,6 +49,7 @@ class SparkContext(object):
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
"""
@@ -66,19 +67,18 @@ class SparkContext(object):
@param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
+
+
+ >>> from pyspark.context import SparkContext
+ >>> sc = SparkContext('local', 'test')
+
+ >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ ValueError:...
"""
- with SparkContext._lock:
- if SparkContext._active_spark_context:
- raise ValueError("Cannot run multiple SparkContexts at once")
- else:
- SparkContext._active_spark_context = self
- if not SparkContext._gateway:
- SparkContext._gateway = launch_gateway()
- SparkContext._jvm = SparkContext._gateway.jvm
- SparkContext._writeIteratorToPickleFile = \
- SparkContext._jvm.PythonRDD.writeIteratorToPickleFile
- SparkContext._takePartition = \
- SparkContext._jvm.PythonRDD.takePartition
+ SparkContext._ensure_initialized(self)
+
self.master = master
self.jobName = jobName
self.sparkHome = sparkHome or None # None becomes null in Py4J
@@ -119,6 +119,32 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
+ @classmethod
+ def _ensure_initialized(cls, instance=None):
+ with SparkContext._lock:
+ if not SparkContext._gateway:
+ SparkContext._gateway = launch_gateway()
+ SparkContext._jvm = SparkContext._gateway.jvm
+ SparkContext._writeIteratorToPickleFile = \
+ SparkContext._jvm.PythonRDD.writeIteratorToPickleFile
+ SparkContext._takePartition = \
+ SparkContext._jvm.PythonRDD.takePartition
+
+ if instance:
+ if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
+ raise ValueError("Cannot run multiple SparkContexts at once")
+ else:
+ SparkContext._active_spark_context = instance
+
+ @classmethod
+ def setSystemProperty(cls, key, value):
+ """
+ Set a system property, such as spark.executor.memory. This must be
+ invoked before instantiating SparkContext.
+ """
+ SparkContext._ensure_initialized()
+ SparkContext._jvm.java.lang.System.setProperty(key, value)
+
@property
def defaultParallelism(self):
"""