From 7eaa56de7f0253869fa85d4366f1048386af477e Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 19 Oct 2013 19:55:39 -0700 Subject: Add an add() method to pyspark accumulators. Add a regular method for adding a term to accumulators in pyspark. Currently if you have a non-global accumulator, adding to it is awkward. The += operator can't be used for non-global accumulators captured via closure because it's involves an assignment. The only way to do it is using __iadd__ directly. Adding this method lets you write code like this: def main(): sc = SparkContext() accum = sc.accumulator(0) rdd = sc.parallelize([1,2,3]) def f(x): accum.add(x) rdd.foreach(f) print accum.value where using accum += x instead would have caused UnboundLocalError exceptions in workers. Currently it would have to be written as accum.__iadd__(x). --- python/pyspark/accumulators.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index d367f91967..da3d96689a 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -42,6 +42,13 @@ >>> a.value 13 +>>> b = sc.accumulator(0) +>>> def g(x): +... b.add(x) +>>> rdd.foreach(g) +>>> b.value +6 + >>> from pyspark.accumulators import AccumulatorParam >>> class VectorAccumulatorParam(AccumulatorParam): ... def zero(self, value): @@ -139,9 +146,13 @@ class Accumulator(object): raise Exception("Accumulator.value cannot be accessed inside tasks") self._value = value + def add(self, term): + """Adds a term to this accumulator's value""" + self._value = self.accum_param.addInPlace(self._value, term) + def __iadd__(self, term): """The += operator; adds a term to this accumulator's value""" - self._value = self.accum_param.addInPlace(self._value, term) + self.add(term) return self def __str__(self): -- cgit v1.2.3 From 56d230e614d7d03a0c53e262071ab388abddd97f Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 22 Oct 2013 00:22:37 -0700 Subject: Add classmethod to SparkContext to set system properties. Add a new classmethod to SparkContext to set system properties like is possible in Scala/Java. Unlike the Java/Scala implementations, there's no access to System until the JVM bridge is created. Since SparkContext handles that, move the initialization of the JVM connection to a separate classmethod that can safely be called repeatedly as long as the same instance (or no instance) is provided. --- python/pyspark/context.py | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) (limited to 'python') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 597110321a..22f5d92a3b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -49,6 +49,7 @@ class SparkContext(object): _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + def __init__(self, master, jobName, sparkHome=None, pyFiles=None, environment=None, batchSize=1024): """ @@ -67,18 +68,8 @@ class SparkContext(object): Java object. Set 1 to disable batching or -1 to use an unlimited batch size. """ - with SparkContext._lock: - if SparkContext._active_spark_context: - raise ValueError("Cannot run multiple SparkContexts at once") - else: - SparkContext._active_spark_context = self - if not SparkContext._gateway: - SparkContext._gateway = launch_gateway() - SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeIteratorToPickleFile = \ - SparkContext._jvm.PythonRDD.writeIteratorToPickleFile - SparkContext._takePartition = \ - SparkContext._jvm.PythonRDD.takePartition + SparkContext._ensure_initialized() + self.master = master self.jobName = jobName self.sparkHome = sparkHome or None # None becomes null in Py4J @@ -119,6 +110,32 @@ class SparkContext(object): self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + @classmethod + def _ensure_initialized(cls, instance=None): + with SparkContext._lock: + if not SparkContext._gateway: + SparkContext._gateway = launch_gateway() + SparkContext._jvm = SparkContext._gateway.jvm + SparkContext._writeIteratorToPickleFile = \ + SparkContext._jvm.PythonRDD.writeIteratorToPickleFile + SparkContext._takePartition = \ + SparkContext._jvm.PythonRDD.takePartition + + if instance: + if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: + raise ValueError("Cannot run multiple SparkContexts at once") + else: + SparkContext._active_spark_context = instance + + @classmethod + def setSystemProperty(cls, key, value): + """ + Set a system property, such as spark.executor.memory. This must be + invoked before instantiating SparkContext. + """ + SparkContext._ensure_initialized() + SparkContext._jvm.java.lang.System.setProperty(key, value) + @property def defaultParallelism(self): """ -- cgit v1.2.3 From 317a9eb1ceb165a74493c452a6c5fc0f9b5e2760 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 22 Oct 2013 11:26:49 -0700 Subject: Pass self to SparkContext._ensure_initialized. The constructor for SparkContext should pass in self so that we track the current context and produce errors if another one is created. Add a doctest to make sure creating multiple contexts triggers the exception. --- python/pyspark/context.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 22f5d92a3b..a7ca8bc888 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -67,8 +67,17 @@ class SparkContext(object): @param batchSize: The number of Python objects represented as a single Java object. Set 1 to disable batching or -1 to use an unlimited batch size. + + + >>> from pyspark.context import SparkContext + >>> sc = SparkContext('local', 'test') + + >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... """ - SparkContext._ensure_initialized() + SparkContext._ensure_initialized(self) self.master = master self.jobName = jobName -- cgit v1.2.3